Add worker num as one of load resource (#26045)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-08-01 21:47:06 +08:00 committed by GitHub
parent 5c1f79dc54
commit 9c55a7f422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 128 deletions

View File

@ -32,7 +32,6 @@ import (
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -41,8 +40,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
@ -50,7 +47,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
@ -75,6 +71,24 @@ type Loader interface {
LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo) error
}
type LoadResource struct {
MemorySize uint64
DiskSize uint64
WorkNum int
}
func (r *LoadResource) Add(resource LoadResource) {
r.MemorySize += resource.MemorySize
r.DiskSize += resource.DiskSize
r.WorkNum += resource.WorkNum
}
func (r *LoadResource) Sub(resource LoadResource) {
r.MemorySize -= resource.MemorySize
r.DiskSize -= resource.DiskSize
r.WorkNum -= resource.WorkNum
}
func NewLoader(
manager *Manager,
cm storage.ChunkManager,
@ -94,14 +108,11 @@ func NewLoader(
ioPoolSize = configPoolSize
}
ioPool := conc.NewPool[*storage.Blob](ioPoolSize, conc.WithPreAlloc(true))
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
loader := &segmentLoader{
manager: manager,
cm: cm,
ioPool: ioPool,
loadingSegments: typeutil.NewConcurrentMap[int64, chan struct{}](),
}
@ -112,13 +123,11 @@ func NewLoader(
type segmentLoader struct {
manager *Manager
cm storage.ChunkManager
ioPool *conc.Pool[*storage.Blob]
mut sync.Mutex
// The channel will be closed as the segment loaded
loadingSegments *typeutil.ConcurrentMap[int64, chan struct{}]
committedMemSize uint64
committedDiskSize uint64
committedResource LoadResource
}
var _ Loader = (*segmentLoader)(nil)
@ -146,11 +155,11 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
// Check memory & storage limit
memUsage, diskUsage, concurrencyLevel, err := loader.requestResource(ctx, infos...)
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
if err != nil {
return nil, err
}
defer loader.freeRequest(memUsage, diskUsage)
defer loader.freeRequest(resource)
newSegments := make(map[int64]*LocalSegment, len(infos))
clearAll := func() {
@ -287,53 +296,68 @@ func (loader *segmentLoader) notifyLoadFinish(segments ...*querypb.SegmentLoadIn
// requestResource requests memory & storage to load segments,
// returns the memory usage, disk usage and concurrency with the gained memory.
func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (uint64, uint64, int, error) {
func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) {
resource := LoadResource{}
memoryUsage := hardware.GetUsedMemoryCount()
totalMemory := hardware.GetMemoryCount()
diskUsage, err := GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
return resource, 0, errors.Wrap(err, "get local used size failed")
}
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
loader.mut.Lock()
defer loader.mut.Unlock()
poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.ThreadCoreCoefficient.GetAsInt()
if loader.committedResource.WorkNum >= poolCap {
return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap))
} else if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
}
concurrencyLevel := funcutil.Min(runtime.GOMAXPROCS(0), len(infos))
var memUsage, diskUsage uint64
for _, info := range infos {
logNum := 0
for _, field := range info.GetBinlogPaths() {
logNum += len(field.GetBinlogs())
resource.WorkNum += len(field.GetBinlogs())
}
if logNum > 0 {
// IO pool will be run out even with the new smaller level
concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1))
for _, index := range info.GetIndexInfos() {
resource.WorkNum += len(index.IndexFilePaths)
}
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err == nil {
break
}
}
mu, du, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return 0, 0, 0, err
}
memUsage += mu
diskUsage += du
}
loader.committedMemSize += memUsage
loader.committedDiskSize += diskUsage
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err == nil {
break
}
}
return memUsage, diskUsage, concurrencyLevel, nil
mu, du, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return resource, 0, err
}
resource.MemorySize += mu
resource.DiskSize += du
loader.committedResource.Add(resource)
return resource, concurrencyLevel, nil
}
// freeRequest returns request memory & storage usage request.
func (loader *segmentLoader) freeRequest(memUsage, diskUsage uint64) {
func (loader *segmentLoader) freeRequest(resource LoadResource) {
loader.mut.Lock()
defer loader.mut.Unlock()
loader.committedMemSize -= memUsage
loader.committedDiskSize -= diskUsage
loader.committedResource.Sub(resource)
}
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error {
@ -383,7 +407,7 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
log.Info("start loading remote...", zap.Int("segmentNum", segmentNum))
loadedBfs := NewConcurrentSet[*pkoracle.BloomFilterSet]()
loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]()
// TODO check memory for bf size
loadRemoteFunc := func(idx int) error {
loadInfo := infos[idx]
@ -546,30 +570,6 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen
return nil
}
// Load binlogs concurrently into memory from KV storage asyncly
func (loader *segmentLoader) loadFieldBinlogsAsync(ctx context.Context, field *datapb.FieldBinlog) []*conc.Future[*storage.Blob] {
futures := make([]*conc.Future[*storage.Blob], 0, len(field.Binlogs))
for i := range field.Binlogs {
path := field.Binlogs[i].GetLogPath()
future := loader.ioPool.Submit(func() (*storage.Blob, error) {
binLog, err := loader.cm.Read(ctx, path)
if err != nil {
log.Warn("failed to load binlog", zap.String("filePath", path), zap.Error(err))
return nil, err
}
blob := &storage.Blob{
Key: path,
Value: binLog,
}
return blob, nil
})
futures = append(futures, future)
}
return futures
}
func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, segment *LocalSegment, vecFieldInfos map[int64]*IndexedFieldInfo) error {
for fieldID, fieldInfo := range vecFieldInfos {
indexInfo := fieldInfo.IndexInfo
@ -610,58 +610,6 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS
return segment.LoadIndex(indexInfo, fieldType)
}
func (loader *segmentLoader) insertIntoSegment(segment *LocalSegment,
rowIDs []UniqueID,
timestamps []Timestamp,
insertData *storage.InsertData) error {
rowNum := len(rowIDs)
if rowNum != len(timestamps) || insertData == nil {
return errors.New(fmt.Sprintln("illegal insert data when load segment, collectionID = ", segment.collectionID))
}
log := log.With(
zap.Int64("collectionID", segment.Collection()),
zap.Int64("segmentID", segment.ID()),
)
log.Info("start load growing segments...", zap.Int("rowNum", len(rowIDs)))
// 1. update bloom filter
insertRecord, err := storage.TransferInsertDataToInsertRecord(insertData)
if err != nil {
return err
}
insertMsg := &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionID: segment.collectionID,
Timestamps: timestamps,
RowIDs: rowIDs,
NumRows: uint64(rowNum),
FieldsData: insertRecord.FieldsData,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
collection := loader.manager.Collection.Get(segment.Collection())
if collection == nil {
err := merr.WrapErrCollectionNotFound(segment.Collection())
log.Warn("failed to get collection while inserting data into segment", zap.Error(err))
return err
}
pks, err := GetPrimaryKeys(insertMsg, collection.Schema())
if err != nil {
return err
}
segment.bloomFilterSet.UpdateBloomFilter(pks)
// 2. do insert
err = segment.Insert(rowIDs, timestamps, insertRecord)
if err != nil {
return err
}
log.Info("Do insert done for growing segment", zap.Int("rowNum", rowNum))
return nil
}
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,
binlogPaths []string, logType storage.StatsLogType) error {
@ -810,7 +758,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca
}
// JoinIDPath joins ids to path format.
func JoinIDPath(ids ...UniqueID) string {
func JoinIDPath(ids ...int64) string {
idStr := make([]string, 0, len(ids))
for _, id := range ids {
idStr = append(idStr, strconv.FormatInt(id, 10))
@ -844,7 +792,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
zap.Int64("collectionID", segmentLoadInfos[0].GetCollectionID()),
)
memUsage := hardware.GetUsedMemoryCount() + loader.committedMemSize
memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize
totalMem := hardware.GetMemoryCount()
if memUsage == 0 || totalMem == 0 {
return 0, 0, errors.New("get memory failed when checkSegmentSize")
@ -854,7 +802,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
if err != nil {
return 0, 0, errors.Wrap(err, "get local used size failed")
}
diskUsage := uint64(localDiskUsage) + loader.committedDiskSize
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
maxSegmentSize := uint64(0)
predictMemUsage := memUsage
@ -913,9 +861,9 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
log.Info("predict memory and disk usage while loading (in MiB)",
zap.Uint64("maxSegmentSize", toMB(maxSegmentSize)),
zap.Int("concurrency", concurrency),
zap.Uint64("committedMemSize", toMB(loader.committedMemSize)),
zap.Uint64("committedMemSize", toMB(loader.committedResource.MemorySize)),
zap.Uint64("memUsage", toMB(memUsage)),
zap.Uint64("committedDiskSize", toMB(loader.committedDiskSize)),
zap.Uint64("committedDiskSize", toMB(loader.committedResource.DiskSize)),
zap.Uint64("diskUsage", toMB(diskUsage)),
zap.Uint64("predictMemUsage", toMB(predictMemUsage)),
zap.Uint64("predictDiskUsage", toMB(predictDiskUsage)),
@ -985,11 +933,11 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
info.Statslogs = nil
return info
})
memUsage, diskUsage, _, err := loader.requestResource(ctx, indexInfo...)
resource, _, err := loader.requestResource(ctx, indexInfo...)
if err != nil {
return err
}
defer loader.freeRequest(memUsage, diskUsage)
defer loader.freeRequest(resource)
log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos)))

View File

@ -54,6 +54,7 @@ var (
ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true)
ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus
ErrCrossClusterRouting = newMilvusError("cross cluster routing", 6, false)
ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false)
// Collection related
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)

View File

@ -66,6 +66,7 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded)
s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal)
s.ErrorIs(WrapErrCrossClusterRouting("ins-0", "ins-1"), ErrCrossClusterRouting)
s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded)
// Collection related
s.ErrorIs(WrapErrCollectionNotFound("test_collection", "failed to get collection"), ErrCollectionNotFound)

View File

@ -176,6 +176,14 @@ func WrapErrCrossClusterRouting(expectedCluster, actualCluster string, msg ...st
return err
}
func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) error {
err := errors.Wrapf(ErrServiceDiskLimitExceeded, "predict=%v, limit=%v", predict, limit)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrDatabaseNotFound(database any, msg ...string) error {
err := wrapWithField(ErrDatabaseNotfound, "database", database)
if len(msg) > 0 {

View File

@ -110,15 +110,19 @@ func (pi *ParamItem) GetAsInt32() int32 {
}
func (pi *ParamItem) GetAsUint() uint {
return uint(getAsInt64(pi.GetValue()))
return uint(getAsUint64(pi.GetValue()))
}
func (pi *ParamItem) GetAsUint32() uint32 {
return uint32(getAsInt64(pi.GetValue()))
return uint32(getAsUint64(pi.GetValue()))
}
func (pi *ParamItem) GetAsUint64() uint64 {
return getAsUint64(pi.GetValue())
}
func (pi *ParamItem) GetAsUint16() uint16 {
return uint16(getAsInt64(pi.GetValue()))
return uint16(getAsUint64(pi.GetValue()))
}
func (pi *ParamItem) GetAsInt64() int64 {
@ -193,6 +197,12 @@ func getAsInt64(v string) int64 {
}, 0)
}
func getAsUint64(v string) uint64 {
return getAndConvert(v, func(value string) (uint64, error) {
return strconv.ParseUint(value, 10, 64)
}, 0)
}
func getAsFloat(v string) float64 {
return getAndConvert(v, func(value string) (float64, error) {
return strconv.ParseFloat(value, 64)