mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
enhance: add configurable memory index load predict memory usage factor (#30561)
related pr: https://github.com/milvus-io/milvus/pull/30475 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
cc46d6bafc
commit
dd957cf9e3
@ -32,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -75,13 +76,13 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo
|
||||
})
|
||||
}
|
||||
|
||||
factor := uint64(1)
|
||||
factor := float64(1)
|
||||
diskUsage := uint64(0)
|
||||
if !isLoadWithDisk {
|
||||
factor = 2
|
||||
factor = paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()
|
||||
} else {
|
||||
diskUsage = uint64(indexInfo.IndexSize)
|
||||
}
|
||||
|
||||
return uint64(indexInfo.IndexSize) * factor, diskUsage, nil
|
||||
return uint64(float64(indexInfo.IndexSize) * factor), diskUsage, nil
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -34,6 +35,10 @@ type IndexAttrCacheSuite struct {
|
||||
c *IndexAttrCache
|
||||
}
|
||||
|
||||
func (s *IndexAttrCacheSuite) SetupSuite() {
|
||||
paramtable.Init()
|
||||
}
|
||||
|
||||
func (s *IndexAttrCacheSuite) SetupTest() {
|
||||
s.c = NewIndexAttrCache()
|
||||
}
|
||||
@ -95,8 +100,8 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
|
||||
memory, disk, err := s.c.GetIndexResourceUsage(info)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.EqualValues(200, memory)
|
||||
s.EqualValues(0, disk)
|
||||
s.Equal(uint64(250), memory)
|
||||
s.Equal(uint64(0), disk)
|
||||
})
|
||||
|
||||
s.Run("corrupted_index_info", func() {
|
||||
|
@ -1981,6 +1981,8 @@ type queryNodeConfig struct {
|
||||
CleanExcludeSegInterval ParamItem `refreshable:"false"`
|
||||
FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
|
||||
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
|
||||
|
||||
MemoryIndexLoadPredictMemoryUsageFactor ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *queryNodeConfig) init(base *BaseTable) {
|
||||
@ -2426,7 +2428,6 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
|
||||
DefaultValue: "8192",
|
||||
Doc: "expr eval batch size for getnext interface",
|
||||
}
|
||||
|
||||
p.ExprEvalBatchSize.Init(base.mgr)
|
||||
|
||||
p.CleanExcludeSegInterval = ParamItem{
|
||||
@ -2437,6 +2438,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
|
||||
Export: true,
|
||||
}
|
||||
p.CleanExcludeSegInterval.Init(base.mgr)
|
||||
|
||||
p.MemoryIndexLoadPredictMemoryUsageFactor = ParamItem{
|
||||
Key: "queryNode.memoryIndexLoadPredictMemoryUsageFactor",
|
||||
Version: "2.3.8",
|
||||
DefaultValue: "2.5", // HNSW index needs more memory to load.
|
||||
Doc: "memory usage prediction factor for memory index loaded",
|
||||
}
|
||||
p.MemoryIndexLoadPredictMemoryUsageFactor.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -361,6 +361,10 @@ func TestComponentParam(t *testing.T) {
|
||||
|
||||
params.Save("querynode.gracefulStopTimeout", "100")
|
||||
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
|
||||
|
||||
assert.Equal(t, 2.5, Params.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
|
||||
params.Save("queryNode.memoryIndexLoadPredictMemoryUsageFactor", "2.0")
|
||||
assert.Equal(t, 2.0, Params.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
|
||||
})
|
||||
|
||||
t.Run("test dataCoordConfig", func(t *testing.T) {
|
||||
|
@ -76,7 +76,7 @@ func DefaultParams() map[string]string {
|
||||
params.LocalStorageCfg.Path.Key: path.Join("/tmp", testPath),
|
||||
params.CommonCfg.StorageType.Key: "local",
|
||||
params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs
|
||||
params.CommonCfg.GracefulStopTimeout.Key: "10",
|
||||
params.CommonCfg.GracefulStopTimeout.Key: "30",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
@ -182,6 +183,7 @@ func (s *QueryNodeSuite) search(collectionName string, dim int) {
|
||||
}
|
||||
queryResult, err := c.Proxy.Query(context.TODO(), queryReq)
|
||||
s.NoError(err)
|
||||
s.Equal(queryResult.Status.ErrorCode, commonpb.ErrorCode_Success)
|
||||
s.Equal(len(queryResult.FieldsData), 1)
|
||||
numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0]
|
||||
s.Equal(numEntities, int64(s.rowsPerCollection))
|
||||
|
Loading…
Reference in New Issue
Block a user