mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
enhance: add lazy load retry configurations (#32848)
Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
c35797c399
commit
1e1fba0588
11
pkg/util/cache/cache.go
vendored
11
pkg/util/cache/cache.go
vendored
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -348,18 +349,20 @@ func (c *lruCache[K, V]) getAndPin(ctx context.Context, key K) (*cacheItem[K, V]
|
|||||||
}
|
}
|
||||||
timer := time.Now()
|
timer := time.Now()
|
||||||
value, err := c.loader(ctx, key)
|
value, err := c.loader(ctx, key)
|
||||||
c.stats.TotalLoadTimeMs.Add(uint64(time.Since(timer).Milliseconds()))
|
|
||||||
// Try to evict one item if there is not enough disk space, then retry.
|
for retryAttempt := 0; merr.ErrServiceDiskLimitExceeded.Is(err) && retryAttempt < paramtable.Get().QueryNodeCfg.LazyLoadMaxRetryTimes.GetAsInt(); retryAttempt++ {
|
||||||
if merr.ErrServiceDiskLimitExceeded.Is(err) {
|
// Try to evict one item if there is not enough disk space, then retry.
|
||||||
c.evictItems(ctx, 1)
|
c.evictItems(ctx, paramtable.Get().QueryNodeCfg.LazyLoadMaxEvictPerRetry.GetAsInt())
|
||||||
value, err = c.loader(ctx, key)
|
value, err = c.loader(ctx, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.stats.LoadFailCount.Inc()
|
c.stats.LoadFailCount.Inc()
|
||||||
log.Debug("loader failed for key", zap.Any("key", key))
|
log.Debug("loader failed for key", zap.Any("key", key))
|
||||||
return nil, true, err
|
return nil, true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.stats.TotalLoadTimeMs.Add(uint64(time.Since(timer).Milliseconds()))
|
||||||
c.stats.LoadSuccessCount.Inc()
|
c.stats.LoadSuccessCount.Inc()
|
||||||
item, err := c.setAndPin(ctx, key, value)
|
item, err := c.setAndPin(ctx, key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2007,6 +2007,8 @@ type queryNodeConfig struct {
|
|||||||
LazyLoadWaitTimeout ParamItem `refreshable:"true"`
|
LazyLoadWaitTimeout ParamItem `refreshable:"true"`
|
||||||
LazyLoadRequestResourceTimeout ParamItem `refreshable:"true"`
|
LazyLoadRequestResourceTimeout ParamItem `refreshable:"true"`
|
||||||
LazyLoadRequestResourceRetryInterval ParamItem `refreshable:"true"`
|
LazyLoadRequestResourceRetryInterval ParamItem `refreshable:"true"`
|
||||||
|
LazyLoadMaxRetryTimes ParamItem `refreshable:"true"`
|
||||||
|
LazyLoadMaxEvictPerRetry ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
// chunk cache
|
// chunk cache
|
||||||
ReadAheadPolicy ParamItem `refreshable:"false"`
|
ReadAheadPolicy ParamItem `refreshable:"false"`
|
||||||
@ -2275,6 +2277,24 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||||||
}
|
}
|
||||||
p.LazyLoadRequestResourceRetryInterval.Init(base.mgr)
|
p.LazyLoadRequestResourceRetryInterval.Init(base.mgr)
|
||||||
|
|
||||||
|
p.LazyLoadMaxRetryTimes = ParamItem{
|
||||||
|
Key: "queryNode.lazyLoadMaxRetryTimes",
|
||||||
|
Version: "2.4.2",
|
||||||
|
DefaultValue: "1",
|
||||||
|
Doc: "max retry times for lazy load, 1 by default",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.LazyLoadMaxRetryTimes.Init(base.mgr)
|
||||||
|
|
||||||
|
p.LazyLoadMaxEvictPerRetry = ParamItem{
|
||||||
|
Key: "queryNode.lazyLoadMaxEvictPerRetry",
|
||||||
|
Version: "2.4.2",
|
||||||
|
DefaultValue: "1",
|
||||||
|
Doc: "max evict count for lazy load, 1 by default",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.LazyLoadMaxEvictPerRetry.Init(base.mgr)
|
||||||
|
|
||||||
p.ReadAheadPolicy = ParamItem{
|
p.ReadAheadPolicy = ParamItem{
|
||||||
Key: "queryNode.cache.readAheadPolicy",
|
Key: "queryNode.cache.readAheadPolicy",
|
||||||
Version: "2.3.2",
|
Version: "2.3.2",
|
||||||
|
Loading…
Reference in New Issue
Block a user