diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 877affce84..99fc5e6552 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -216,6 +216,10 @@ func (node *QueryNode) InitSegcore() error { C.InitCpuNum(cCPUNum) knowhereBuildPoolSize := uint32(float32(paramtable.Get().QueryNodeCfg.InterimIndexBuildParallelRate.GetAsFloat()) * float32(hardware.GetCPUNum())) + if knowhereBuildPoolSize < uint32(1) { + knowhereBuildPoolSize = uint32(1) + } + log.Info("set up knowhere build pool size", zap.Uint32("pool_size", knowhereBuildPoolSize)) cKnowhereBuildPoolSize := C.uint32_t(knowhereBuildPoolSize) C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize) diff --git a/pkg/config/manager.go b/pkg/config/manager.go index 0569006735..cf9252eee5 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -84,16 +84,36 @@ type Manager struct { keySourceMap *typeutil.ConcurrentMap[string, string] // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file overlays *typeutil.ConcurrentMap[string, string] // store the highest priority configs which modified at runtime forbiddenKeys *typeutil.ConcurrentSet[string] + configCache *typeutil.ConcurrentMap[string, interface{}] } func NewManager() *Manager { - return &Manager{ + manager := &Manager{ Dispatcher: NewEventDispatcher(), sources: typeutil.NewConcurrentMap[string, Source](), keySourceMap: typeutil.NewConcurrentMap[string, string](), overlays: typeutil.NewConcurrentMap[string, string](), forbiddenKeys: typeutil.NewConcurrentSet[string](), + configCache: typeutil.NewConcurrentMap[string, interface{}](), } + resetConfigCacheFunc := NewHandler("reset.config.cache", func(event *Event) { + keyToRemove := strings.NewReplacer("/", ".").Replace(event.Key) + manager.configCache.Remove(keyToRemove) + }) + manager.Dispatcher.RegisterForKeyPrefix("", resetConfigCacheFunc) + return manager +} + +func (m *Manager) GetCachedValue(key string) (interface{}, bool) { + return m.configCache.Get(key) +} + +func (m *Manager) SetCachedValue(key string, value interface{}) { + m.configCache.Insert(key, value) +} + +func (m *Manager) EvictCachedValue(key string) { + m.configCache.Remove(key) } func (m *Manager) GetConfig(key string) (string, error) { diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index bef70835ae..ef4c2290ab 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -146,32 +146,27 @@ func TestOnEvent(t *testing.T) { KeyPrefix: "test", RefreshInterval: 10 * time.Millisecond, })) - os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) time.Sleep(time.Second) value, err := mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "aaa") - ctx := context.Background() client.KV.Put(ctx, "test/config/a/b", "bbb") time.Sleep(time.Second) value, err = mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "bbb") - client.KV.Put(ctx, "test/config/a/b", "ccc") time.Sleep(time.Second) value, err = mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "ccc") - os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) time.Sleep(time.Second) value, err = mgr.GetConfig("a.b") assert.NoError(t, err) assert.Equal(t, value, "ccc") - client.KV.Delete(ctx, "test/config/a/b") time.Sleep(time.Second) value, err = mgr.GetConfig("a.b") @@ -201,6 +196,61 @@ func TestDeadlock(t *testing.T) { wg.Wait() } +func TestCachedConfig(t *testing.T) { + cfg, _ := embed.ConfigFromFile("../../configs/advanced/etcd.yaml") + cfg.Dir = "/tmp/milvus/test" + e, err := embed.StartEtcd(cfg) + assert.NoError(t, err) + defer e.Close() + defer os.RemoveAll(cfg.Dir) + + dir, _ := os.MkdirTemp("", "milvus") + yamlFile := path.Join(dir, "milvus.yaml") + mgr, _ := Init(WithEnvSource(formatKey), + WithFilesSource(&FileInfo{ + Files: []string{yamlFile}, + RefreshInterval: 10 * time.Millisecond, + }), + WithEtcdSource(&EtcdInfo{ + Endpoints: []string{cfg.ACUrls[0].Host}, + KeyPrefix: "test", + RefreshInterval: 10 * time.Millisecond, + })) + // test get cached value from file + { + os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) + time.Sleep(time.Second) + _, exist := mgr.GetCachedValue("a.b") + assert.False(t, exist) + mgr.SetCachedValue("a.b", "aaa") + val, exist := mgr.GetCachedValue("a.b") + assert.True(t, exist) + assert.Equal(t, "aaa", val.(string)) + + // after refresh, the cached value should be reset + os.WriteFile(yamlFile, []byte("a.b: xxx"), 0o600) + time.Sleep(time.Second) + _, exist = mgr.GetCachedValue("a.b") + assert.False(t, exist) + } + client := v3client.New(e.Server) + { + _, exist := mgr.GetCachedValue("c.d") + assert.False(t, exist) + mgr.SetCachedValue("cd", "xxx") + val, exist := mgr.GetCachedValue("cd") + assert.True(t, exist) + assert.Equal(t, "xxx", val.(string)) + + // after refresh, the cached value should be reset + ctx := context.Background() + client.KV.Put(ctx, "test/config/c/d", "www") + time.Sleep(time.Second) + _, exist = mgr.GetCachedValue("cd") + assert.False(t, exist) + } +} + type ErrSource struct{} func (e ErrSource) Close() { diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index d68862d0ce..daa0b36351 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -253,12 +253,14 @@ func (bt *BaseTable) GetWithDefault(key, defaultValue string) string { // Remove Config by key func (bt *BaseTable) Remove(key string) error { bt.mgr.DeleteConfig(key) + bt.mgr.EvictCachedValue(key) return nil } // Update Config func (bt *BaseTable) Save(key, value string) error { bt.mgr.SetConfig(key, value) + bt.mgr.EvictCachedValue(key) return nil } @@ -272,5 +274,6 @@ func (bt *BaseTable) SaveGroup(group map[string]string) error { // Reset Config to default value func (bt *BaseTable) Reset(key string) error { bt.mgr.ResetConfig(key) + bt.mgr.EvictCachedValue(key) return nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 80295c4d54..2de2f87ddf 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1952,7 +1952,7 @@ type queryNodeConfig struct { InterimIndexNlist ParamItem `refreshable:"false"` InterimIndexNProbe ParamItem `refreshable:"false"` InterimIndexMemExpandRate ParamItem `refreshable:"false"` - InterimIndexBuildParallelRate ParamItem `refreshable:"true"` + InterimIndexBuildParallelRate ParamItem `refreshable:"false"` // memory limit LoadMemoryUsageFactor ParamItem `refreshable:"true"` diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index c2213e315d..17d68f8325 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -284,6 +284,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt()) assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt()) + params.Save(Params.BalanceCheckInterval.Key, "10000") assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt()) assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt()) assert.Equal(t, 3, Params.CollectionRecoverTimesLimit.GetAsInt()) @@ -470,3 +471,32 @@ func TestForbiddenItem(t *testing.T) { }) assert.Equal(t, "by-dev", params.CommonCfg.ClusterPrefix.GetValue()) } + +func TestCachedParam(t *testing.T) { + Init() + params := Get() + + assert.True(t, params.IndexNodeCfg.EnableDisk.GetAsBool()) + assert.True(t, params.IndexNodeCfg.EnableDisk.GetAsBool()) + + assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt()) + assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt()) + + assert.Equal(t, int32(16), params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) + assert.Equal(t, int32(16), params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) + + assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) + assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) + + assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) + assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) + + assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64()) + assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64()) + + assert.Equal(t, 0.85, params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat()) + assert.Equal(t, 0.85, params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat()) + + assert.Equal(t, 1*time.Hour, params.DataCoordCfg.GCInterval.GetAsDuration(time.Second)) + assert.Equal(t, 1*time.Hour, params.DataCoordCfg.GCInterval.GetAsDuration(time.Second)) +} diff --git a/pkg/util/paramtable/grpc_param_test.go b/pkg/util/paramtable/grpc_param_test.go index b68e132731..1e9c30ac00 100644 --- a/pkg/util/paramtable/grpc_param_test.go +++ b/pkg/util/paramtable/grpc_param_test.go @@ -128,23 +128,23 @@ func TestGrpcClientParams(t *testing.T) { base.Save("grpc.client.maxMaxAttempts", "4") assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), 4) - assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff) - base.Save("grpc.client.initialBackOff", "a") - assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff) - base.Save("grpc.client.initialBackOff", "2.0") - assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), 2.0) + assert.Equal(t, DefaultInitialBackoff, clientConfig.InitialBackoff.GetAsFloat()) + base.Save(clientConfig.InitialBackoff.Key, "a") + assert.Equal(t, DefaultInitialBackoff, clientConfig.InitialBackoff.GetAsFloat()) + base.Save(clientConfig.InitialBackoff.Key, "2.0") + assert.Equal(t, 2.0, clientConfig.InitialBackoff.GetAsFloat()) assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), DefaultMaxBackoff) - base.Save("grpc.client.maxBackOff", "a") + base.Save(clientConfig.MaxBackoff.Key, "a") assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), DefaultMaxBackoff) - base.Save("grpc.client.maxBackOff", "50.0") - assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), 50.0) + base.Save(clientConfig.MaxBackoff.Key, "50.0") + assert.Equal(t, 50.0, clientConfig.MaxBackoff.GetAsFloat()) assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled) base.Save("grpc.client.CompressionEnabled", "a") assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled) - base.Save("grpc.client.CompressionEnabled", "true") - assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), true) + base.Save(clientConfig.CompressionEnabled.Key, "true") + assert.Equal(t, true, clientConfig.CompressionEnabled.GetAsBool()) assert.Equal(t, clientConfig.MinResetInterval.GetValue(), "1000") base.Save("grpc.client.minResetInterval", "abc") diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index 4fe1c6f2fd..6c06fb27dd 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -85,6 +85,7 @@ func (pi *ParamItem) SwapTempValue(s string) *string { if s == "" { return pi.tempValue.Swap(nil) } + pi.manager.EvictCachedValue(pi.Key) return pi.tempValue.Swap(&s) } @@ -94,47 +95,124 @@ func (pi *ParamItem) GetValue() string { } func (pi *ParamItem) GetAsStrings() []string { - return getAsStrings(pi.GetValue()) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if strings, ok := val.([]string); ok { + return strings + } + } + realStrs := getAsStrings(pi.GetValue()) + pi.manager.SetCachedValue(pi.Key, realStrs) + return realStrs } func (pi *ParamItem) GetAsBool() bool { - return getAsBool(pi.GetValue()) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if boolVal, ok := val.(bool); ok { + return boolVal + } + } + boolVal := getAsBool(pi.GetValue()) + pi.manager.SetCachedValue(pi.Key, boolVal) + return boolVal } func (pi *ParamItem) GetAsInt() int { - return getAsInt(pi.GetValue()) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if intVal, ok := val.(int); ok { + return intVal + } + } + intVal := getAsInt(pi.GetValue()) + pi.manager.SetCachedValue(pi.Key, intVal) + return intVal } func (pi *ParamItem) GetAsInt32() int32 { - return int32(getAsInt64(pi.GetValue())) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if int32Val, ok := val.(int32); ok { + return int32Val + } + } + int32Val := int32(getAsInt64(pi.GetValue())) + pi.manager.SetCachedValue(pi.Key, int32Val) + return int32Val } func (pi *ParamItem) GetAsUint() uint { - return uint(getAsUint64(pi.GetValue())) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if uintVal, ok := val.(uint); ok { + return uintVal + } + } + uintVal := uint(getAsUint64(pi.GetValue())) + pi.manager.SetCachedValue(pi.Key, uintVal) + return uintVal } func (pi *ParamItem) GetAsUint32() uint32 { - return uint32(getAsUint64(pi.GetValue())) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if uint32Val, ok := val.(uint32); ok { + return uint32Val + } + } + uint32Val := uint32(getAsUint64(pi.GetValue())) + pi.manager.SetCachedValue(pi.Key, uint32Val) + return uint32Val } func (pi *ParamItem) GetAsUint64() uint64 { - return getAsUint64(pi.GetValue()) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if uint64Val, ok := val.(uint64); ok { + return uint64Val + } + } + uint64Val := getAsUint64(pi.GetValue()) + pi.manager.SetCachedValue(pi.Key, uint64Val) + return uint64Val } func (pi *ParamItem) GetAsUint16() uint16 { - return uint16(getAsUint64(pi.GetValue())) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if uint16Val, ok := val.(uint16); ok { + return uint16Val + } + } + uint16Val := uint16(getAsUint64(pi.GetValue())) + pi.manager.SetCachedValue(pi.Key, uint16Val) + return uint16Val } func (pi *ParamItem) GetAsInt64() int64 { - return getAsInt64(pi.GetValue()) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if int64Val, ok := val.(int64); ok { + return int64Val + } + } + int64Val := getAsInt64(pi.GetValue()) + pi.manager.SetCachedValue(pi.Key, int64Val) + return int64Val } func (pi *ParamItem) GetAsFloat() float64 { - return getAsFloat(pi.GetValue()) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if floatVal, ok := val.(float64); ok { + return floatVal + } + } + floatVal := getAsFloat(pi.GetValue()) + pi.manager.SetCachedValue(pi.Key, floatVal) + return floatVal } func (pi *ParamItem) GetAsDuration(unit time.Duration) time.Duration { - return getAsDuration(pi.GetValue(), unit) + if val, exist := pi.manager.GetCachedValue(pi.Key); exist { + if durationVal, ok := val.(time.Duration); ok { + return durationVal + } + } + durationVal := getAsDuration(pi.GetValue(), unit) + pi.manager.SetCachedValue(pi.Key, durationVal) + return durationVal } func (pi *ParamItem) GetAsJSONMap() map[string]string {