enhance: Optimized the GC logic to ensure that memory is released in time (#34949)

issue: #34703

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-07-28 23:53:47 +08:00 committed by GitHub
parent 08fa51d4f4
commit 2372452fac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 80 additions and 13 deletions

View File

@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"syscall"
@ -48,6 +49,7 @@ import (
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/gc"
"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -375,6 +377,18 @@ func (mr *MilvusRoles) Run() {
http.ServeHTTP()
setupPrometheusHTTPServer(Registry)
if paramtable.Get().CommonCfg.GCEnabled.GetAsBool() {
if paramtable.Get().CommonCfg.GCHelperEnabled.GetAsBool() {
action := func(GOGC uint32) {
debug.SetGCPercent(int(GOGC))
}
gc.NewTuner(paramtable.Get().CommonCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
} else {
action := func(uint32) {}
gc.NewTuner(paramtable.Get().CommonCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
}
}
var wg sync.WaitGroup
local := mr.Local

View File

@ -22,6 +22,8 @@ import (
sio "io"
"math"
"path"
"runtime"
"runtime/debug"
"sort"
"strconv"
"strings"
@ -455,7 +457,9 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
var totalBufferSize int64 = 0
for _, buffer := range t.clusterBuffers {
t.clusterBufferLocks.RLock(buffer.id)
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load()
t.clusterBufferLocks.RUnlock(buffer.id)
}
return totalBufferSize
}
@ -554,6 +558,7 @@ func (t *clusteringCompactionTask) mappingSegment(
err := pkIter.Next()
if err != nil {
if err == sio.EOF {
pkIter.Close()
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
@ -932,6 +937,10 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
return err
}
}
writer = nil
runtime.GC()
debug.FreeOSMemory()
log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()),
zap.Int64("cost", time.Since(start).Milliseconds()))
return nil

View File

@ -34,7 +34,6 @@ import (
"path"
"path/filepath"
"plugin"
"runtime/debug"
"strings"
"sync"
"time"
@ -64,7 +63,6 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/gc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/lock"
@ -366,17 +364,6 @@ func (node *QueryNode) Init() error {
initError = err
return
}
if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() {
if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() {
action := func(GOGC uint32) {
debug.SetGCPercent(int(GOGC))
}
gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
} else {
action := func(uint32) {}
gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
}
}
log.Info("query node init successfully",
zap.Int64("queryNodeID", node.GetNodeID()),

View File

@ -270,6 +270,12 @@ type commonConfig struct {
UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"`
UseVectorAsClusteringKey ParamItem `refreshable:"true"`
EnableVectorClusteringKey ParamItem `refreshable:"true"`
GCEnabled ParamItem `refreshable:"false"`
GCHelperEnabled ParamItem `refreshable:"false"`
OverloadedMemoryThresholdPercentage ParamItem `refreshable:"false"`
MaximumGOGCConfig ParamItem `refreshable:"false"`
MinimumGOGCConfig ParamItem `refreshable:"false"`
}
func (p *commonConfig) init(base *BaseTable) {
@ -828,6 +834,45 @@ like the old password verification when updating the credential`,
Export: true,
}
p.EnableVectorClusteringKey.Init(base.mgr)
p.GCEnabled = ParamItem{
Key: "common.gcenabled",
Version: "2.4.7",
DefaultValue: "true",
}
p.GCEnabled.Init(base.mgr)
p.GCHelperEnabled = ParamItem{
Key: "common.gchelper.enabled",
Version: "2.4.7",
DefaultValue: "true",
}
p.GCHelperEnabled.Init(base.mgr)
p.OverloadedMemoryThresholdPercentage = ParamItem{
Key: "common.overloadedMemoryThresholdPercentage",
Version: "2.4.7",
DefaultValue: "90",
PanicIfEmpty: true,
Formatter: func(v string) string {
return fmt.Sprintf("%f", getAsFloat(v)/100)
},
}
p.OverloadedMemoryThresholdPercentage.Init(base.mgr)
p.MaximumGOGCConfig = ParamItem{
Key: "common.gchelper.maximumGoGC",
Version: "2.4.7",
DefaultValue: "200",
}
p.MaximumGOGCConfig.Init(base.mgr)
p.MinimumGOGCConfig = ParamItem{
Key: "common.gchelper.minimumGoGC",
Version: "2.4.7",
DefaultValue: "30",
}
p.MinimumGOGCConfig.Init(base.mgr)
}
type gpuConfig struct {

View File

@ -119,6 +119,17 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings())
assert.Equal(t, 1000, params.CommonCfg.BloomFilterApplyBatchSize.GetAsInt())
params.Save("common.gcenabled", "false")
assert.False(t, Params.GCEnabled.GetAsBool())
params.Save("common.gchelper.enabled", "false")
assert.False(t, Params.GCHelperEnabled.GetAsBool())
params.Save("common.overloadedMemoryThresholdPercentage", "40")
assert.Equal(t, 0.4, Params.OverloadedMemoryThresholdPercentage.GetAsFloat())
params.Save("common.gchelper.maximumGoGC", "100")
assert.Equal(t, 100, Params.MaximumGOGCConfig.GetAsInt())
params.Save("common.gchelper.minimumGoGC", "80")
assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt())
})
t.Run("test rootCoordConfig", func(t *testing.T) {

View File

@ -210,6 +210,7 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
s.NoError(err)
s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
for _, segInfo := range segsInfoResp.GetInfos() {
s.LessOrEqual(segInfo.GetNumOfRows(), int64(1024*1024/128))
totalRows += segInfo.GetNumOfRows()
}