2021-11-01 22:51:41 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:50:12 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-01 22:51:41 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:50:12 +08:00
//
2021-11-01 22:51:41 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:50:12 +08:00
2021-01-15 14:38:36 +08:00
package indexnode
2020-12-10 17:55:55 +08:00
import (
"context"
2021-03-08 10:09:48 +08:00
"errors"
2021-06-30 19:46:14 +08:00
"fmt"
2021-09-26 19:19:57 +08:00
"path"
2021-03-26 11:19:02 +08:00
"runtime"
2022-01-13 20:27:34 +08:00
"runtime/debug"
2020-12-22 08:14:36 +08:00
"strconv"
2020-12-13 06:48:05 +08:00
2021-03-10 09:56:09 +08:00
"go.uber.org/zap"
2021-12-31 16:11:24 +08:00
"github.com/golang/protobuf/proto"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/kv"
2021-05-27 22:24:29 +08:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil"
2021-06-30 19:46:14 +08:00
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
2021-07-22 11:40:11 +08:00
"github.com/milvus-io/milvus/internal/util/trace"
2020-12-10 17:55:55 +08:00
)
2021-02-07 21:32:37 +08:00
const (
2021-10-09 18:27:00 +08:00
// paramsKeyToParse is the key of the param to build index.
paramsKeyToParse = "params"
// IndexBuildTaskName is the name of the operation to add an index task.
2021-02-23 09:58:06 +08:00
IndexBuildTaskName = "IndexBuildTask"
2021-02-07 21:32:37 +08:00
)
2020-12-10 17:55:55 +08:00
type task interface {
2021-02-23 09:58:06 +08:00
Ctx ( ) context . Context
ID ( ) UniqueID // return ReqID
Name ( ) string
2020-12-10 17:55:55 +08:00
SetID ( uid UniqueID ) // set ReqID
2021-02-23 09:58:06 +08:00
PreExecute ( ctx context . Context ) error
Execute ( ctx context . Context ) error
PostExecute ( ctx context . Context ) error
2020-12-10 17:55:55 +08:00
WaitToFinish ( ) error
Notify ( err error )
2020-12-13 06:48:05 +08:00
OnEnqueue ( ) error
2021-02-27 10:45:03 +08:00
SetError ( err error )
2020-12-10 17:55:55 +08:00
}
2021-10-09 18:27:00 +08:00
// BaseTask is an basic instance of task.
2020-12-10 17:55:55 +08:00
type BaseTask struct {
2021-10-11 16:38:33 +08:00
done chan error
ctx context . Context
id UniqueID
err error
internalErr error
2021-02-27 10:45:03 +08:00
}
2021-10-09 18:27:00 +08:00
// SetError sets an error to task.
2021-02-27 10:45:03 +08:00
func ( bt * BaseTask ) SetError ( err error ) {
bt . err = err
2020-12-10 17:55:55 +08:00
}
2021-10-09 18:27:00 +08:00
// ID returns the id of index task.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) ID ( ) UniqueID {
return bt . id
}
2022-01-05 13:07:25 +08:00
// setID set the ID for the task.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) setID ( id UniqueID ) {
bt . id = id
}
2021-10-09 18:27:00 +08:00
// WaitToFinish will wait for the task to complete, if the context is done, it means that the execution of the task has timed out.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) WaitToFinish ( ) error {
2020-12-13 06:48:05 +08:00
select {
case <- bt . ctx . Done ( ) :
return errors . New ( "timeout" )
case err := <- bt . done :
return err
2020-12-10 17:55:55 +08:00
}
}
2021-10-09 18:27:00 +08:00
// Notify will notify WaitToFinish that the task is completed or failed.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) Notify ( err error ) {
bt . done <- err
}
2021-10-09 00:05:49 +08:00
// IndexBuildTask is used to record the information of the index tasks.
2021-01-26 09:38:40 +08:00
type IndexBuildTask struct {
2020-12-10 17:55:55 +08:00
BaseTask
2021-12-09 14:19:40 +08:00
index Index
kv kv . BaseKV
etcdKV * etcdkv . EtcdKV
savePaths [ ] string
req * indexpb . CreateIndexRequest
nodeID UniqueID
serializedSize uint64
2022-01-10 20:51:34 +08:00
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
newTypeParams map [ string ] string
newIndexParams map [ string ] string
tr * timerecord . TimeRecorder
2021-01-26 09:38:40 +08:00
}
2021-10-09 00:05:49 +08:00
// Ctx is the context of index tasks.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) Ctx ( ) context . Context {
return it . ctx
}
2021-10-09 00:05:49 +08:00
// ID returns the id of index task.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) ID ( ) UniqueID {
return it . id
2020-12-10 17:55:55 +08:00
}
2021-10-09 00:05:49 +08:00
// SetID sets the id for index task.
2021-01-26 09:38:40 +08:00
func ( it * IndexBuildTask ) SetID ( ID UniqueID ) {
2020-12-13 06:48:05 +08:00
it . BaseTask . setID ( ID )
}
2020-12-10 17:55:55 +08:00
2021-10-09 00:05:49 +08:00
// Name is the name of task to build index.
2021-02-23 09:58:06 +08:00
func ( bt * BaseTask ) Name ( ) string {
return IndexBuildTaskName
}
2021-10-09 00:05:49 +08:00
// OnEnqueue enqueues indexing tasks.
2021-01-26 09:38:40 +08:00
func ( it * IndexBuildTask ) OnEnqueue ( ) error {
2021-03-12 14:22:09 +08:00
it . SetID ( it . req . IndexBuildID )
2021-12-23 22:00:16 +08:00
log . Debug ( "IndexNode IndexBuilderTask Enqueue" , zap . Int64 ( "taskID" , it . ID ( ) ) , zap . Int64 ( "index buildID" , it . req . IndexBuildID ) )
2022-01-10 20:51:34 +08:00
it . tr = timerecord . NewTimeRecorder ( fmt . Sprintf ( "IndexBuildTask %d" , it . req . IndexBuildID ) )
2020-12-10 17:55:55 +08:00
return nil
}
2022-01-05 13:05:25 +08:00
// checkIndexMeta load meta from etcd to determine whether the task should continue execution.
2021-06-23 09:24:10 +08:00
func ( it * IndexBuildTask ) checkIndexMeta ( ctx context . Context , pre bool ) error {
2021-05-27 22:24:29 +08:00
fn := func ( ) error {
2021-10-13 23:24:32 +08:00
//TODO error handling need to be optimized, return Unrecoverable to avoid retry
2021-05-27 22:24:29 +08:00
indexMeta := indexpb . IndexMeta { }
_ , values , versions , err := it . etcdKV . LoadWithPrefix2 ( it . req . MetaPath )
if err != nil {
2021-09-26 21:22:04 +08:00
log . Error ( "IndexNode checkIndexMeta" , zap . Any ( "load meta error with path" , it . req . MetaPath ) ,
2021-06-06 09:41:35 +08:00
zap . Error ( err ) , zap . Any ( "pre" , pre ) )
2021-05-27 22:24:29 +08:00
return err
}
2021-09-29 20:26:00 +08:00
if len ( values ) == 0 {
2021-12-23 14:28:25 +08:00
return fmt . Errorf ( "IndexNode checkIndexMeta the indexMeta is empty" )
2021-09-29 20:26:00 +08:00
}
2021-06-06 09:41:35 +08:00
log . Debug ( "IndexNode checkIndexMeta load meta success" , zap . Any ( "path" , it . req . MetaPath ) , zap . Any ( "pre" , pre ) )
2021-09-29 20:26:00 +08:00
err = proto . Unmarshal ( [ ] byte ( values [ 0 ] ) , & indexMeta )
2021-05-27 22:24:29 +08:00
if err != nil {
2021-10-13 23:24:32 +08:00
log . Error ( "IndexNode failed to unmarshal index meta" , zap . Error ( err ) )
2021-05-27 22:24:29 +08:00
return err
}
2021-06-06 09:41:35 +08:00
log . Debug ( "IndexNode checkIndexMeta Unmarshal success" , zap . Any ( "IndexMeta" , indexMeta ) )
2021-05-27 22:24:29 +08:00
if indexMeta . Version > it . req . Version || indexMeta . State == commonpb . IndexState_Finished {
2021-10-13 23:24:32 +08:00
log . Info ( "IndexNode checkIndexMeta version mismatch" ,
zap . Any ( "req version" , it . req . Version ) ,
zap . Any ( "index meta version" , indexMeta . Version ) )
return nil
2021-05-27 22:24:29 +08:00
}
if indexMeta . MarkDeleted {
indexMeta . State = commonpb . IndexState_Finished
2021-09-29 20:26:00 +08:00
v , err := proto . Marshal ( & indexMeta )
if err != nil {
return err
}
err = it . etcdKV . CompareVersionAndSwap ( it . req . MetaPath , versions [ 0 ] , string ( v ) )
2021-05-27 22:24:29 +08:00
if err != nil {
return err
}
2021-07-23 10:44:12 +08:00
errMsg := fmt . Sprintf ( "the index has been deleted with indexBuildID %d" , indexMeta . IndexBuildID )
log . Warn ( errMsg )
return fmt . Errorf ( errMsg )
2021-05-27 22:24:29 +08:00
}
if pre {
return nil
}
indexMeta . IndexFilePaths = it . savePaths
indexMeta . State = commonpb . IndexState_Finished
2021-12-09 14:19:40 +08:00
indexMeta . SerializeSize = it . serializedSize
2021-10-11 16:38:33 +08:00
// Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added.
2021-05-27 22:24:29 +08:00
if it . err != nil {
2021-10-19 16:24:35 +08:00
log . Error ( "IndexNode CreateIndex failed and can not be retried" , zap . Int64 ( "IndexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "err" , it . err ) )
2021-05-27 22:24:29 +08:00
indexMeta . State = commonpb . IndexState_Failed
2021-07-29 14:47:22 +08:00
indexMeta . FailReason = it . err . Error ( )
2021-10-11 16:38:33 +08:00
} else if it . internalErr != nil {
2021-10-19 16:24:35 +08:00
log . Error ( "IndexNode CreateIndex failed, but it can retried" , zap . Int64 ( "IndexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "err" , it . internalErr ) )
2021-10-11 16:38:33 +08:00
indexMeta . State = commonpb . IndexState_Unissued
2021-05-27 22:24:29 +08:00
}
2021-10-11 16:38:33 +08:00
2021-07-23 10:44:12 +08:00
log . Debug ( "IndexNode" , zap . Int64 ( "indexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "IndexState" , indexMeta . State ) )
2021-09-29 20:26:00 +08:00
var metaValue [ ] byte
metaValue , err = proto . Marshal ( & indexMeta )
if err != nil {
2021-10-13 23:24:32 +08:00
log . Warn ( "IndexNode" , zap . Int64 ( "indexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "IndexState" , indexMeta . State ) ,
2021-09-29 20:26:00 +08:00
zap . Any ( "proto.Marshal failed:" , err ) )
return err
}
2021-10-13 23:24:32 +08:00
err = it . etcdKV . CompareVersionAndSwap ( it . req . MetaPath , versions [ 0 ] , string ( metaValue ) )
if err != nil {
log . Warn ( "IndexNode checkIndexMeta CompareVersionAndSwap" , zap . Error ( err ) )
}
2021-11-30 19:32:17 +08:00
return nil
2021-05-27 22:24:29 +08:00
}
2021-06-23 09:24:10 +08:00
err := retry . Do ( ctx , fn , retry . Attempts ( 3 ) )
2021-10-13 23:24:32 +08:00
if err != nil {
log . Error ( "IndexNode failed to checkIndexMeta" , zap . Error ( err ) )
}
2022-01-10 20:51:34 +08:00
msg := fmt . Sprintf ( "check index meta pre: %v" , pre )
it . tr . Record ( msg )
2021-05-27 22:24:29 +08:00
return err
}
2021-10-09 18:24:58 +08:00
// PreExecute does some checks before building the index, for example, whether the index has been deleted.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) PreExecute ( ctx context . Context ) error {
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode IndexBuildTask preExecute..." , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2021-07-22 11:40:11 +08:00
sp , ctx := trace . StartSpanFromContextWithOperationName ( ctx , "CreateIndex-PreExecute" )
defer sp . Finish ( )
2021-06-23 09:24:10 +08:00
return it . checkIndexMeta ( ctx , true )
2020-12-10 17:55:55 +08:00
}
2021-10-09 18:24:58 +08:00
// PostExecute does some checks after building the index, for example, whether the index has been deleted or
// whether the index task is up to date.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) PostExecute ( ctx context . Context ) error {
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode IndexBuildTask PostExecute..." , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2021-07-22 11:40:11 +08:00
sp , _ := trace . StartSpanFromContextWithOperationName ( ctx , "CreateIndex-PostExecute" )
defer sp . Finish ( )
2021-06-23 09:24:10 +08:00
return it . checkIndexMeta ( ctx , false )
2020-12-13 06:48:05 +08:00
}
2022-01-10 20:51:34 +08:00
func ( it * IndexBuildTask ) executePrepareParams ( ctx context . Context ) error {
2020-12-22 08:14:36 +08:00
typeParams := make ( map [ string ] string )
2021-03-12 14:22:09 +08:00
for _ , kvPair := range it . req . GetTypeParams ( ) {
2020-12-22 08:14:36 +08:00
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
_ , ok := typeParams [ key ]
if ok {
return errors . New ( "duplicated key in type params" )
}
2021-02-07 21:32:37 +08:00
if key == paramsKeyToParse {
params , err := funcutil . ParseIndexParamsMap ( value )
if err != nil {
return err
}
for pk , pv := range params {
typeParams [ pk ] = pv
}
} else {
typeParams [ key ] = value
}
2020-12-22 08:14:36 +08:00
}
indexParams := make ( map [ string ] string )
2021-03-12 14:22:09 +08:00
for _ , kvPair := range it . req . GetIndexParams ( ) {
2020-12-22 08:14:36 +08:00
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
_ , ok := indexParams [ key ]
if ok {
return errors . New ( "duplicated key in index params" )
}
2021-02-07 21:32:37 +08:00
if key == paramsKeyToParse {
params , err := funcutil . ParseIndexParamsMap ( value )
if err != nil {
return err
}
for pk , pv := range params {
indexParams [ pk ] = pv
}
} else {
indexParams [ key ] = value
}
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
it . newTypeParams = typeParams
it . newIndexParams = indexParams
return nil
}
2020-12-22 08:14:36 +08:00
2022-01-10 20:51:34 +08:00
func ( it * IndexBuildTask ) executeStepLoad ( ctx context . Context ) ( storage . FieldID , storage . FieldData , error ) {
2020-12-22 08:14:36 +08:00
getValueByPath := func ( path string ) ( [ ] byte , error ) {
data , err := it . kv . Load ( path )
if err != nil {
return nil , err
}
return [ ] byte ( data ) , nil
}
getBlobByPath := func ( path string ) ( * Blob , error ) {
value , err := getValueByPath ( path )
if err != nil {
return nil , err
}
return & Blob {
2022-01-10 20:51:34 +08:00
Key : path ,
2020-12-22 08:14:36 +08:00
Value : value ,
} , nil
}
2021-03-12 14:22:09 +08:00
toLoadDataPaths := it . req . GetDataPaths ( )
2021-03-26 11:19:02 +08:00
keys := make ( [ ] string , len ( toLoadDataPaths ) )
blobs := make ( [ ] * Blob , len ( toLoadDataPaths ) )
loadKey := func ( idx int ) error {
2022-01-10 20:51:34 +08:00
keys [ idx ] = toLoadDataPaths [ idx ]
2021-03-26 11:19:02 +08:00
blob , err := getBlobByPath ( toLoadDataPaths [ idx ] )
2020-12-22 08:14:36 +08:00
if err != nil {
return err
}
2021-03-26 11:19:02 +08:00
blobs [ idx ] = blob
return nil
}
2022-01-07 17:31:22 +08:00
// Use runtime.GOMAXPROCS(0) instead of runtime.NumCPU()
// to respect CPU quota of container/pod
// gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value
2022-01-10 20:51:34 +08:00
err := funcutil . ProcessFuncParallel ( len ( toLoadDataPaths ) , runtime . GOMAXPROCS ( 0 ) , loadKey , "loadKey" )
2021-03-26 11:19:02 +08:00
if err != nil {
2021-10-11 16:38:33 +08:00
log . Warn ( "loadKey from minio failed" , zap . Error ( err ) )
it . internalErr = err
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
2022-01-10 20:51:34 +08:00
return storage . InvalidUniqueID , nil , err
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode load data success" , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2022-01-10 20:51:34 +08:00
it . tr . Record ( "load vector data done" )
2020-12-22 08:14:36 +08:00
var insertCodec storage . InsertCodec
2022-01-10 20:51:34 +08:00
collectionID , partitionID , segmentID , insertData , err2 := insertCodec . DeserializeAll ( blobs )
2021-01-15 14:38:36 +08:00
if err2 != nil {
2022-01-10 20:51:34 +08:00
return storage . InvalidUniqueID , nil , err2
2021-01-15 14:38:36 +08:00
}
2020-12-22 08:14:36 +08:00
if len ( insertData . Data ) != 1 {
2022-01-10 20:51:34 +08:00
return storage . InvalidUniqueID , nil , errors . New ( "we expect only one field in deserialized insert data" )
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
it . collectionID = collectionID
it . partitionID = partitionID
it . segmentID = segmentID
2021-12-23 22:00:16 +08:00
log . Debug ( "IndexNode deserialize data success" ,
zap . Int64 ( "taskID" , it . ID ( ) ) ,
2022-01-10 20:51:34 +08:00
zap . Int64 ( "IndexID" , it . req . IndexID ) ,
2021-12-23 22:00:16 +08:00
zap . Int64 ( "index buildID" , it . req . IndexBuildID ) ,
2022-01-10 20:51:34 +08:00
zap . Int64 ( "collectionID" , it . collectionID ) ,
zap . Int64 ( "partitionID" , it . partitionID ) ,
zap . Int64 ( "segmentID" , it . segmentID ) )
it . tr . Record ( "deserialize vector data done" )
// we can ensure that there blobs are in one Field
var data storage . FieldData
var fieldID storage . FieldID
for fID , value := range insertData . Data {
data = value
fieldID = fID
break
}
return fieldID , data , nil
}
func ( it * IndexBuildTask ) executeStepBuild ( ctx context . Context ) ( [ ] * storage . Blob , error ) {
var fieldID storage . FieldID
{
var err error
var fieldData storage . FieldData
fieldID , fieldData , err = it . executeStepLoad ( ctx )
if err != nil {
return nil , err
}
floatVectorFieldData , fOk := fieldData . ( * storage . FloatVectorFieldData )
2021-01-07 15:39:20 +08:00
if fOk {
2022-01-10 20:51:34 +08:00
err := it . index . BuildFloatVecIndexWithoutIds ( floatVectorFieldData . Data )
2021-01-07 15:39:20 +08:00
if err != nil {
2021-06-06 09:41:35 +08:00
log . Error ( "IndexNode BuildFloatVecIndexWithoutIds failed" , zap . Error ( err ) )
2022-01-10 20:51:34 +08:00
return nil , err
2021-01-07 15:39:20 +08:00
}
2021-01-07 14:56:17 +08:00
}
2022-01-10 20:51:34 +08:00
binaryVectorFieldData , bOk := fieldData . ( * storage . BinaryVectorFieldData )
2021-01-07 15:39:20 +08:00
if bOk {
2022-01-10 20:51:34 +08:00
err := it . index . BuildBinaryVecIndexWithoutIds ( binaryVectorFieldData . Data )
2021-01-07 15:39:20 +08:00
if err != nil {
2021-06-06 09:41:35 +08:00
log . Error ( "IndexNode BuildBinaryVecIndexWithoutIds failed" , zap . Error ( err ) )
2021-01-07 15:39:20 +08:00
}
2022-01-10 20:51:34 +08:00
return nil , err
2021-01-07 15:39:20 +08:00
}
2021-01-07 16:13:28 +08:00
if ! fOk && ! bOk {
2022-01-10 20:51:34 +08:00
return nil , errors . New ( "we expect FloatVectorFieldData or BinaryVectorFieldData" )
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
it . tr . Record ( "build index done" )
}
2020-12-22 08:14:36 +08:00
2022-01-10 20:51:34 +08:00
indexBlobs , err := it . index . Serialize ( )
if err != nil {
log . Error ( "IndexNode index Serialize failed" , zap . Error ( err ) )
return nil , err
}
it . tr . Record ( "index serialize done" )
2020-12-22 08:14:36 +08:00
2022-01-10 20:51:34 +08:00
// early release index for gc, and we can ensure that Delete is idempotent.
if err := it . index . Delete ( ) ; err != nil {
log . Error ( "IndexNode IndexBuildTask Execute CIndexDelete failed" ,
zap . Int64 ( "buildId" , it . req . IndexBuildID ) ,
zap . Error ( err ) )
}
2021-09-26 19:19:57 +08:00
2022-01-10 20:51:34 +08:00
var serializedIndexBlobs [ ] * storage . Blob
codec := storage . NewIndexFileBinlogCodec ( )
serializedIndexBlobs , err = codec . Serialize (
it . req . IndexBuildID ,
it . req . Version ,
it . collectionID ,
it . partitionID ,
it . segmentID ,
fieldID ,
it . newIndexParams ,
it . req . IndexName ,
it . req . IndexID ,
indexBlobs ,
)
if err != nil {
return nil , err
}
it . tr . Record ( "index codec serialize done" )
return serializedIndexBlobs , nil
}
func ( it * IndexBuildTask ) executeSave ( ctx context . Context , blobs [ ] * storage . Blob ) error {
blobCnt := len ( blobs )
it . serializedSize = 0
for i := range blobs {
it . serializedSize += uint64 ( len ( blobs [ i ] . Value ) )
}
getSavePathByKey := func ( key string ) string {
return path . Join ( Params . IndexNodeCfg . IndexStorageRootPath , strconv . Itoa ( int ( it . req . IndexBuildID ) ) , strconv . Itoa ( int ( it . req . Version ) ) ,
strconv . Itoa ( int ( it . partitionID ) ) , strconv . Itoa ( int ( it . segmentID ) ) , key )
}
2020-12-22 08:14:36 +08:00
2022-01-10 20:51:34 +08:00
it . savePaths = make ( [ ] string , blobCnt )
saveIndexFile := func ( idx int ) error {
blob := blobs [ idx ]
savePath := getSavePathByKey ( blob . Key )
saveIndexFileFn := func ( ) error {
v , err := it . etcdKV . Load ( it . req . MetaPath )
if err != nil {
log . Warn ( "IndexNode load meta failed" , zap . Any ( "path" , it . req . MetaPath ) , zap . Error ( err ) )
return err
2021-05-27 22:24:29 +08:00
}
2022-01-10 20:51:34 +08:00
indexMeta := indexpb . IndexMeta { }
err = proto . Unmarshal ( [ ] byte ( v ) , & indexMeta )
2020-12-22 08:14:36 +08:00
if err != nil {
2022-01-10 20:51:34 +08:00
log . Warn ( "IndexNode Unmarshal indexMeta error " , zap . Error ( err ) )
2020-12-22 08:14:36 +08:00
return err
}
2022-01-10 20:51:34 +08:00
//log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
if indexMeta . Version > it . req . Version {
log . Warn ( "IndexNode try saveIndexFile failed req.Version is low" , zap . Any ( "req.Version" , it . req . Version ) ,
zap . Any ( "indexMeta.Version" , indexMeta . Version ) )
return errors . New ( "This task has been reassigned, check indexMeta.version and request " )
}
return it . kv . Save ( savePath , string ( blob . Value ) )
}
err := retry . Do ( ctx , saveIndexFileFn , retry . Attempts ( 5 ) )
if err != nil {
log . Warn ( "IndexNode try saveIndexFile final" , zap . Error ( err ) , zap . Any ( "savePath" , savePath ) )
return err
}
it . savePaths [ idx ] = savePath
return nil
}
2021-03-26 11:19:02 +08:00
2022-01-10 20:51:34 +08:00
err := funcutil . ProcessFuncParallel ( blobCnt , runtime . NumCPU ( ) , saveIndexFile , "saveIndexFile" )
if err != nil {
log . Warn ( "saveIndexFile to minio failed" , zap . Error ( err ) )
// In this case, we intend not to return err, otherwise the task will be marked as failed.
it . internalErr = err
}
return nil
}
2021-03-26 11:19:02 +08:00
2022-01-13 20:27:34 +08:00
func ( it * IndexBuildTask ) releaseMemory ( ) {
debug . FreeOSMemory ( )
}
2022-01-10 20:51:34 +08:00
// Execute actually performs the task of building an index.
func ( it * IndexBuildTask ) Execute ( ctx context . Context ) error {
log . Debug ( "IndexNode IndexBuildTask Execute ..." , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
sp , _ := trace . StartSpanFromContextWithOperationName ( ctx , "CreateIndex-Execute" )
defer sp . Finish ( )
if err := it . executePrepareParams ( ctx ) ; err != nil {
return err
}
2022-01-17 14:43:38 +08:00
defer it . releaseMemory ( )
2022-01-10 20:51:34 +08:00
var err error
it . index , err = NewCIndex ( it . newTypeParams , it . newIndexParams )
if err != nil {
log . Error ( "IndexNode IndexBuildTask Execute NewCIndex failed" ,
zap . Int64 ( "buildId" , it . req . IndexBuildID ) ,
zap . Error ( err ) )
return err
}
defer func ( ) {
err := it . index . Delete ( )
2021-03-26 11:19:02 +08:00
if err != nil {
2022-01-10 20:51:34 +08:00
log . Error ( "IndexNode IndexBuildTask Execute CIndexDelete failed" ,
zap . Int64 ( "buildId" , it . req . IndexBuildID ) ,
zap . Error ( err ) )
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
} ( )
var blobs [ ] * storage . Blob
blobs , err = it . executeStepBuild ( ctx )
if err != nil {
return err
}
err = it . executeSave ( ctx , blobs )
if err != nil {
return err
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
it . tr . Record ( "index file save done" )
it . tr . Elapse ( "index building all done" )
log . Info ( "IndexNode CreateIndex successfully " , zap . Int64 ( "collect" , it . collectionID ) ,
zap . Int64 ( "partition" , it . partitionID ) , zap . Int64 ( "segment" , it . segmentID ) )
2022-01-13 20:27:34 +08:00
2021-01-23 20:58:46 +08:00
return nil
2020-12-10 17:55:55 +08:00
}