Add goroutine pool for creating index (#20391)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2022-11-08 19:41:02 +08:00 committed by GitHub
parent 396f3e408b
commit 889eb02172
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 12 deletions

View File

@ -207,6 +207,8 @@ func (i *IndexCoord) Init() error {
initErr = err
return
}
log.Info("IndexCoord get node sessions from etcd", zap.Bool("bind mode", Params.IndexCoordCfg.BindIndexNodeMode),
zap.String("node address", Params.IndexCoordCfg.IndexNodeAddress))
aliveNodeID := make([]UniqueID, 0)
if Params.IndexCoordCfg.BindIndexNodeMode {
if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil {

View File

@ -29,15 +29,18 @@ import (
"context"
"errors"
"io"
"math"
"math/rand"
"os"
"path"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/panjf2000/ants/v2"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
@ -47,6 +50,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/initcore"
@ -97,6 +101,8 @@ type IndexNode struct {
initOnce sync.Once
stateLock sync.Mutex
tasks map[taskKey]*taskInfo
cgoPool *concurrency.Pool
}
// NewIndexNode creates a new IndexNode component.
@ -198,6 +204,27 @@ func (i *IndexNode) Init() error {
i.closer = trace.InitTracing("index_node")
i.initKnowhere()
// IndexNode will not execute tasks concurrently, so the size of goroutines pool is 1.
i.cgoPool, err = concurrency.NewPool(1, ants.WithPreAlloc(true),
ants.WithExpiryDuration(math.MaxInt64))
if err != nil {
log.Error("IndexNode init cgo pool failed", zap.Error(err))
initErr = err
return
}
sig := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
i.cgoPool.Submit(func() (interface{}, error) {
runtime.LockOSThread()
wg.Done()
<-sig
return nil, nil
})
wg.Wait()
close(sig)
})
log.Info("Init IndexNode finished", zap.Error(initErr))

View File

@ -94,6 +94,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
pool: i.cgoPool,
}
ret := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
@ -100,6 +101,7 @@ type indexBuildTask struct {
tr *timerecord.TimeRecorder
statistic indexpb.JobInfo
node *IndexNode
pool *concurrency.Pool
}
func (it *indexBuildTask) Reset() {
@ -247,13 +249,18 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
dType := dataset.DType
var err error
if dType != schemapb.DataType_None {
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig())
if err != nil {
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
return err
}
_, err = it.pool.Submit(func() (interface{}, error) {
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig())
if err != nil {
return nil, err
}
err = it.index.Build(dataset)
err = it.index.Build(dataset)
if err != nil {
return nil, err
}
return nil, nil
}).Await()
if err != nil {
log.Ctx(ctx).Error("failed to build index", zap.Error(err))
return err
@ -358,13 +365,19 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
zap.Int64("buildID", it.BuildID),
zap.String("index params", string(jsonIndexParams)))
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig())
if err != nil {
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
return err
}
_, err = it.pool.Submit(func() (interface{}, error) {
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig())
if err != nil {
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
return nil, err
}
err = it.index.Build(dataset)
err = it.index.Build(dataset)
if err != nil {
return nil, err
}
return nil, nil
}).Await()
if err != nil {
if it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",