2021-11-01 22:51:41 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:50:12 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-01 22:51:41 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:50:12 +08:00
//
2021-11-01 22:51:41 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:50:12 +08:00
2021-01-15 14:38:36 +08:00
package indexnode
2020-12-10 17:55:55 +08:00
import (
"context"
2021-03-08 10:09:48 +08:00
"errors"
2021-06-30 19:46:14 +08:00
"fmt"
2021-09-26 19:19:57 +08:00
"path"
2021-03-26 11:19:02 +08:00
"runtime"
2020-12-22 08:14:36 +08:00
"strconv"
2020-12-13 06:48:05 +08:00
2021-05-27 22:24:29 +08:00
"github.com/golang/protobuf/proto"
2021-03-10 09:56:09 +08:00
"go.uber.org/zap"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/kv"
2021-05-27 22:24:29 +08:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil"
2021-06-30 19:46:14 +08:00
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
2021-07-22 11:40:11 +08:00
"github.com/milvus-io/milvus/internal/util/trace"
2020-12-10 17:55:55 +08:00
)
2021-02-07 21:32:37 +08:00
const (
2021-10-09 18:27:00 +08:00
// paramsKeyToParse is the key of the param to build index.
paramsKeyToParse = "params"
// IndexBuildTaskName is the name of the operation to add an index task.
2021-02-23 09:58:06 +08:00
IndexBuildTaskName = "IndexBuildTask"
2021-02-07 21:32:37 +08:00
)
2020-12-10 17:55:55 +08:00
type task interface {
2021-02-23 09:58:06 +08:00
Ctx ( ) context . Context
ID ( ) UniqueID // return ReqID
Name ( ) string
2020-12-10 17:55:55 +08:00
SetID ( uid UniqueID ) // set ReqID
2021-02-23 09:58:06 +08:00
PreExecute ( ctx context . Context ) error
Execute ( ctx context . Context ) error
PostExecute ( ctx context . Context ) error
2020-12-10 17:55:55 +08:00
WaitToFinish ( ) error
Notify ( err error )
2020-12-13 06:48:05 +08:00
OnEnqueue ( ) error
2021-02-27 10:45:03 +08:00
SetError ( err error )
2020-12-10 17:55:55 +08:00
}
2021-10-09 18:27:00 +08:00
// BaseTask is an basic instance of task.
2020-12-10 17:55:55 +08:00
type BaseTask struct {
2021-10-11 16:38:33 +08:00
done chan error
ctx context . Context
id UniqueID
err error
internalErr error
2021-02-27 10:45:03 +08:00
}
2021-10-09 18:27:00 +08:00
// SetError sets an error to task.
2021-02-27 10:45:03 +08:00
func ( bt * BaseTask ) SetError ( err error ) {
bt . err = err
2020-12-10 17:55:55 +08:00
}
2021-10-09 18:27:00 +08:00
// ID returns the id of index task.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) ID ( ) UniqueID {
return bt . id
}
func ( bt * BaseTask ) setID ( id UniqueID ) {
bt . id = id
}
2021-10-09 18:27:00 +08:00
// WaitToFinish will wait for the task to complete, if the context is done, it means that the execution of the task has timed out.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) WaitToFinish ( ) error {
2020-12-13 06:48:05 +08:00
select {
case <- bt . ctx . Done ( ) :
return errors . New ( "timeout" )
case err := <- bt . done :
return err
2020-12-10 17:55:55 +08:00
}
}
2021-10-09 18:27:00 +08:00
// Notify will notify WaitToFinish that the task is completed or failed.
2020-12-10 17:55:55 +08:00
func ( bt * BaseTask ) Notify ( err error ) {
bt . done <- err
}
2021-10-09 00:05:49 +08:00
// IndexBuildTask is used to record the information of the index tasks.
2021-01-26 09:38:40 +08:00
type IndexBuildTask struct {
2020-12-10 17:55:55 +08:00
BaseTask
2021-06-15 10:19:38 +08:00
index Index
kv kv . BaseKV
etcdKV * etcdkv . EtcdKV
savePaths [ ] string
req * indexpb . CreateIndexRequest
nodeID UniqueID
2021-01-26 09:38:40 +08:00
}
2021-10-09 00:05:49 +08:00
// Ctx is the context of index tasks.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) Ctx ( ) context . Context {
return it . ctx
}
2021-10-09 00:05:49 +08:00
// ID returns the id of index task.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) ID ( ) UniqueID {
return it . id
2020-12-10 17:55:55 +08:00
}
2021-10-09 00:05:49 +08:00
// SetID sets the id for index task.
2021-01-26 09:38:40 +08:00
func ( it * IndexBuildTask ) SetID ( ID UniqueID ) {
2020-12-13 06:48:05 +08:00
it . BaseTask . setID ( ID )
}
2020-12-10 17:55:55 +08:00
2021-10-09 00:05:49 +08:00
// Name is the name of task to build index.
2021-02-23 09:58:06 +08:00
func ( bt * BaseTask ) Name ( ) string {
return IndexBuildTaskName
}
2021-10-09 00:05:49 +08:00
// OnEnqueue enqueues indexing tasks.
2021-01-26 09:38:40 +08:00
func ( it * IndexBuildTask ) OnEnqueue ( ) error {
2021-03-12 14:22:09 +08:00
it . SetID ( it . req . IndexBuildID )
2021-06-06 09:41:35 +08:00
log . Debug ( "IndexNode IndexBuilderTask Enqueue" , zap . Int64 ( "TaskID" , it . ID ( ) ) )
2020-12-10 17:55:55 +08:00
return nil
}
2021-06-23 09:24:10 +08:00
func ( it * IndexBuildTask ) checkIndexMeta ( ctx context . Context , pre bool ) error {
2021-05-27 22:24:29 +08:00
fn := func ( ) error {
2021-10-13 23:24:32 +08:00
//TODO error handling need to be optimized, return Unrecoverable to avoid retry
2021-05-27 22:24:29 +08:00
indexMeta := indexpb . IndexMeta { }
_ , values , versions , err := it . etcdKV . LoadWithPrefix2 ( it . req . MetaPath )
if err != nil {
2021-09-26 21:22:04 +08:00
log . Error ( "IndexNode checkIndexMeta" , zap . Any ( "load meta error with path" , it . req . MetaPath ) ,
2021-06-06 09:41:35 +08:00
zap . Error ( err ) , zap . Any ( "pre" , pre ) )
2021-05-27 22:24:29 +08:00
return err
}
2021-09-29 20:26:00 +08:00
if len ( values ) == 0 {
return fmt . Errorf ( "IndexNode checkIndexMeta the indexMeta is empty" )
}
2021-06-06 09:41:35 +08:00
log . Debug ( "IndexNode checkIndexMeta load meta success" , zap . Any ( "path" , it . req . MetaPath ) , zap . Any ( "pre" , pre ) )
2021-09-29 20:26:00 +08:00
err = proto . Unmarshal ( [ ] byte ( values [ 0 ] ) , & indexMeta )
2021-05-27 22:24:29 +08:00
if err != nil {
2021-10-13 23:24:32 +08:00
log . Error ( "IndexNode failed to unmarshal index meta" , zap . Error ( err ) )
2021-05-27 22:24:29 +08:00
return err
}
2021-06-06 09:41:35 +08:00
log . Debug ( "IndexNode checkIndexMeta Unmarshal success" , zap . Any ( "IndexMeta" , indexMeta ) )
2021-05-27 22:24:29 +08:00
if indexMeta . Version > it . req . Version || indexMeta . State == commonpb . IndexState_Finished {
2021-10-13 23:24:32 +08:00
log . Info ( "IndexNode checkIndexMeta version mismatch" ,
zap . Any ( "req version" , it . req . Version ) ,
zap . Any ( "index meta version" , indexMeta . Version ) )
return nil
2021-05-27 22:24:29 +08:00
}
if indexMeta . MarkDeleted {
indexMeta . State = commonpb . IndexState_Finished
2021-09-29 20:26:00 +08:00
v , err := proto . Marshal ( & indexMeta )
if err != nil {
return err
}
err = it . etcdKV . CompareVersionAndSwap ( it . req . MetaPath , versions [ 0 ] , string ( v ) )
2021-05-27 22:24:29 +08:00
if err != nil {
return err
}
2021-07-23 10:44:12 +08:00
errMsg := fmt . Sprintf ( "the index has been deleted with indexBuildID %d" , indexMeta . IndexBuildID )
log . Warn ( errMsg )
return fmt . Errorf ( errMsg )
2021-05-27 22:24:29 +08:00
}
if pre {
return nil
}
indexMeta . IndexFilePaths = it . savePaths
indexMeta . State = commonpb . IndexState_Finished
2021-10-11 16:38:33 +08:00
// Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added.
2021-05-27 22:24:29 +08:00
if it . err != nil {
2021-10-19 16:24:35 +08:00
log . Error ( "IndexNode CreateIndex failed and can not be retried" , zap . Int64 ( "IndexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "err" , it . err ) )
2021-05-27 22:24:29 +08:00
indexMeta . State = commonpb . IndexState_Failed
2021-07-29 14:47:22 +08:00
indexMeta . FailReason = it . err . Error ( )
2021-10-11 16:38:33 +08:00
} else if it . internalErr != nil {
2021-10-19 16:24:35 +08:00
log . Error ( "IndexNode CreateIndex failed, but it can retried" , zap . Int64 ( "IndexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "err" , it . internalErr ) )
2021-10-11 16:38:33 +08:00
indexMeta . State = commonpb . IndexState_Unissued
2021-05-27 22:24:29 +08:00
}
2021-10-11 16:38:33 +08:00
2021-07-23 10:44:12 +08:00
log . Debug ( "IndexNode" , zap . Int64 ( "indexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "IndexState" , indexMeta . State ) )
2021-09-29 20:26:00 +08:00
var metaValue [ ] byte
metaValue , err = proto . Marshal ( & indexMeta )
if err != nil {
2021-10-13 23:24:32 +08:00
log . Warn ( "IndexNode" , zap . Int64 ( "indexBuildID" , indexMeta . IndexBuildID ) , zap . Any ( "IndexState" , indexMeta . State ) ,
2021-09-29 20:26:00 +08:00
zap . Any ( "proto.Marshal failed:" , err ) )
return err
}
2021-10-13 23:24:32 +08:00
err = it . etcdKV . CompareVersionAndSwap ( it . req . MetaPath , versions [ 0 ] , string ( metaValue ) )
if err != nil {
log . Warn ( "IndexNode checkIndexMeta CompareVersionAndSwap" , zap . Error ( err ) )
}
2021-11-30 19:32:17 +08:00
return nil
2021-05-27 22:24:29 +08:00
}
2021-06-23 09:24:10 +08:00
err := retry . Do ( ctx , fn , retry . Attempts ( 3 ) )
2021-10-13 23:24:32 +08:00
if err != nil {
log . Error ( "IndexNode failed to checkIndexMeta" , zap . Error ( err ) )
}
2021-05-27 22:24:29 +08:00
return err
}
2021-10-09 18:24:58 +08:00
// PreExecute does some checks before building the index, for example, whether the index has been deleted.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) PreExecute ( ctx context . Context ) error {
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode IndexBuildTask preExecute..." , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2021-07-22 11:40:11 +08:00
sp , ctx := trace . StartSpanFromContextWithOperationName ( ctx , "CreateIndex-PreExecute" )
defer sp . Finish ( )
2021-06-23 09:24:10 +08:00
return it . checkIndexMeta ( ctx , true )
2020-12-10 17:55:55 +08:00
}
2021-10-09 18:24:58 +08:00
// PostExecute does some checks after building the index, for example, whether the index has been deleted or
// whether the index task is up to date.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) PostExecute ( ctx context . Context ) error {
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode IndexBuildTask PostExecute..." , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2021-07-22 11:40:11 +08:00
sp , _ := trace . StartSpanFromContextWithOperationName ( ctx , "CreateIndex-PostExecute" )
defer sp . Finish ( )
2021-06-23 09:24:10 +08:00
return it . checkIndexMeta ( ctx , false )
2020-12-13 06:48:05 +08:00
}
2021-10-09 18:24:58 +08:00
// Execute actually performs the task of building an index.
2021-02-23 09:58:06 +08:00
func ( it * IndexBuildTask ) Execute ( ctx context . Context ) error {
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode IndexBuildTask Execute ..." , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2021-07-22 11:40:11 +08:00
sp , _ := trace . StartSpanFromContextWithOperationName ( ctx , "CreateIndex-Execute" )
defer sp . Finish ( )
2021-06-30 19:46:14 +08:00
tr := timerecord . NewTimeRecorder ( fmt . Sprintf ( "IndexBuildTask %d" , it . req . IndexBuildID ) )
2021-01-26 09:38:40 +08:00
var err error
2021-02-07 21:32:37 +08:00
2020-12-22 08:14:36 +08:00
typeParams := make ( map [ string ] string )
2021-03-12 14:22:09 +08:00
for _ , kvPair := range it . req . GetTypeParams ( ) {
2020-12-22 08:14:36 +08:00
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
_ , ok := typeParams [ key ]
if ok {
return errors . New ( "duplicated key in type params" )
}
2021-02-07 21:32:37 +08:00
if key == paramsKeyToParse {
params , err := funcutil . ParseIndexParamsMap ( value )
if err != nil {
return err
}
for pk , pv := range params {
typeParams [ pk ] = pv
}
} else {
typeParams [ key ] = value
}
2020-12-22 08:14:36 +08:00
}
indexParams := make ( map [ string ] string )
2021-03-12 14:22:09 +08:00
for _ , kvPair := range it . req . GetIndexParams ( ) {
2020-12-22 08:14:36 +08:00
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
_ , ok := indexParams [ key ]
if ok {
return errors . New ( "duplicated key in index params" )
}
2021-02-07 21:32:37 +08:00
if key == paramsKeyToParse {
params , err := funcutil . ParseIndexParamsMap ( value )
if err != nil {
return err
}
for pk , pv := range params {
indexParams [ pk ] = pv
}
} else {
indexParams [ key ] = value
}
2020-12-22 08:14:36 +08:00
}
it . index , err = NewCIndex ( typeParams , indexParams )
if err != nil {
2021-10-13 23:24:32 +08:00
log . Error ( "IndexNode IndexBuildTask Execute NewCIndex failed" ,
zap . Int64 ( "buildId" , it . req . IndexBuildID ) ,
zap . Error ( err ) )
2020-12-22 08:14:36 +08:00
return err
}
2021-02-22 18:33:40 +08:00
defer func ( ) {
err = it . index . Delete ( )
if err != nil {
2021-10-19 16:24:35 +08:00
log . Warn ( "IndexNode IndexBuildTask Execute CIndexDelete failed" ,
2021-10-13 23:24:32 +08:00
zap . Int64 ( "buildId" , it . req . IndexBuildID ) ,
zap . Error ( err ) )
2021-02-22 18:33:40 +08:00
}
} ( )
2020-12-22 08:14:36 +08:00
getKeyByPathNaive := func ( path string ) string {
// splitElements := strings.Split(path, "/")
// return splitElements[len(splitElements)-1]
return path
}
getValueByPath := func ( path string ) ( [ ] byte , error ) {
data , err := it . kv . Load ( path )
if err != nil {
return nil , err
}
return [ ] byte ( data ) , nil
}
getBlobByPath := func ( path string ) ( * Blob , error ) {
value , err := getValueByPath ( path )
if err != nil {
return nil , err
}
return & Blob {
Key : getKeyByPathNaive ( path ) ,
Value : value ,
} , nil
}
getStorageBlobs := func ( blobs [ ] * Blob ) [ ] * storage . Blob {
2020-12-25 11:10:31 +08:00
return blobs
2020-12-22 08:14:36 +08:00
}
2021-03-12 14:22:09 +08:00
toLoadDataPaths := it . req . GetDataPaths ( )
2021-03-26 11:19:02 +08:00
keys := make ( [ ] string , len ( toLoadDataPaths ) )
blobs := make ( [ ] * Blob , len ( toLoadDataPaths ) )
loadKey := func ( idx int ) error {
keys [ idx ] = getKeyByPathNaive ( toLoadDataPaths [ idx ] )
blob , err := getBlobByPath ( toLoadDataPaths [ idx ] )
2020-12-22 08:14:36 +08:00
if err != nil {
return err
}
2021-03-26 11:19:02 +08:00
blobs [ idx ] = blob
return nil
}
err = funcutil . ProcessFuncParallel ( len ( toLoadDataPaths ) , runtime . NumCPU ( ) , loadKey , "loadKey" )
if err != nil {
2021-10-11 16:38:33 +08:00
log . Warn ( "loadKey from minio failed" , zap . Error ( err ) )
it . internalErr = err
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
return nil
2020-12-22 08:14:36 +08:00
}
2021-10-13 23:24:32 +08:00
log . Debug ( "IndexNode load data success" , zap . Int64 ( "buildId" , it . req . IndexBuildID ) )
2021-06-30 19:46:14 +08:00
tr . Record ( "loadKey done" )
2020-12-22 08:14:36 +08:00
storageBlobs := getStorageBlobs ( blobs )
var insertCodec storage . InsertCodec
2021-09-29 09:52:12 +08:00
collectionID , partitionID , segmentID , insertData , err2 := insertCodec . DeserializeAll ( storageBlobs )
2021-01-15 14:38:36 +08:00
if err2 != nil {
return err2
}
2020-12-22 08:14:36 +08:00
if len ( insertData . Data ) != 1 {
return errors . New ( "we expect only one field in deserialized insert data" )
}
2021-06-30 19:46:14 +08:00
tr . Record ( "deserialize storage blobs done" )
2020-12-22 08:14:36 +08:00
2021-09-29 09:52:12 +08:00
for fieldID , value := range insertData . Data {
2020-12-22 08:14:36 +08:00
// TODO: BinaryVectorFieldData
2021-01-07 15:39:20 +08:00
floatVectorFieldData , fOk := value . ( * storage . FloatVectorFieldData )
if fOk {
err = it . index . BuildFloatVecIndexWithoutIds ( floatVectorFieldData . Data )
if err != nil {
2021-06-06 09:41:35 +08:00
log . Error ( "IndexNode BuildFloatVecIndexWithoutIds failed" , zap . Error ( err ) )
2021-01-07 15:39:20 +08:00
return err
}
2021-06-30 19:46:14 +08:00
tr . Record ( "build float vector index done" )
2021-01-07 14:56:17 +08:00
}
2021-01-07 15:39:20 +08:00
binaryVectorFieldData , bOk := value . ( * storage . BinaryVectorFieldData )
if bOk {
err = it . index . BuildBinaryVecIndexWithoutIds ( binaryVectorFieldData . Data )
if err != nil {
2021-06-06 09:41:35 +08:00
log . Error ( "IndexNode BuildBinaryVecIndexWithoutIds failed" , zap . Error ( err ) )
2021-01-07 15:39:20 +08:00
return err
}
2021-06-30 19:46:14 +08:00
tr . Record ( "build binary vector index done" )
2021-01-07 15:39:20 +08:00
}
2021-01-07 16:13:28 +08:00
if ! fOk && ! bOk {
2021-01-07 15:39:20 +08:00
return errors . New ( "we expect FloatVectorFieldData or BinaryVectorFieldData" )
2020-12-22 08:14:36 +08:00
}
indexBlobs , err := it . index . Serialize ( )
if err != nil {
2021-06-06 09:41:35 +08:00
log . Error ( "IndexNode index Serialize failed" , zap . Error ( err ) )
2020-12-22 08:14:36 +08:00
return err
}
2021-06-30 19:46:14 +08:00
tr . Record ( "serialize index done" )
2020-12-22 08:14:36 +08:00
2021-09-29 09:52:12 +08:00
codec := storage . NewIndexFileBinlogCodec ( )
serializedIndexBlobs , err := codec . Serialize (
it . req . IndexBuildID ,
it . req . Version ,
collectionID ,
partitionID ,
segmentID ,
fieldID ,
indexParams ,
it . req . IndexName ,
it . req . IndexID ,
getStorageBlobs ( indexBlobs ) ,
)
2020-12-22 08:14:36 +08:00
if err != nil {
return err
}
2021-06-30 19:46:14 +08:00
tr . Record ( "serialize index codec done" )
2020-12-22 08:14:36 +08:00
getSavePathByKey := func ( key string ) string {
2021-09-26 19:19:57 +08:00
2021-11-05 11:45:00 +08:00
return path . Join ( Params . IndexStorageRootPath , strconv . Itoa ( int ( it . req . IndexBuildID ) ) , strconv . Itoa ( int ( it . req . Version ) ) ,
2021-09-26 19:19:57 +08:00
strconv . Itoa ( int ( partitionID ) ) , strconv . Itoa ( int ( segmentID ) ) , key )
2020-12-22 08:14:36 +08:00
}
saveBlob := func ( path string , value [ ] byte ) error {
return it . kv . Save ( path , string ( value ) )
}
2021-03-26 11:19:02 +08:00
it . savePaths = make ( [ ] string , len ( serializedIndexBlobs ) )
saveIndexFile := func ( idx int ) error {
blob := serializedIndexBlobs [ idx ]
2020-12-25 11:10:31 +08:00
key , value := blob . Key , blob . Value
2021-03-26 11:19:02 +08:00
2020-12-22 08:14:36 +08:00
savePath := getSavePathByKey ( key )
2021-03-26 11:19:02 +08:00
2021-05-27 22:24:29 +08:00
saveIndexFileFn := func ( ) error {
v , err := it . etcdKV . Load ( it . req . MetaPath )
if err != nil {
2021-10-13 23:24:32 +08:00
log . Warn ( "IndexNode load meta failed" , zap . Any ( "path" , it . req . MetaPath ) , zap . Error ( err ) )
2021-05-27 22:24:29 +08:00
return err
}
indexMeta := indexpb . IndexMeta { }
2021-09-29 20:26:00 +08:00
err = proto . Unmarshal ( [ ] byte ( v ) , & indexMeta )
2021-05-27 22:24:29 +08:00
if err != nil {
2021-10-13 23:24:32 +08:00
log . Warn ( "IndexNode Unmarshal indexMeta error " , zap . Error ( err ) )
2021-05-27 22:24:29 +08:00
return err
}
2021-06-30 19:46:14 +08:00
//log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
2021-05-27 22:24:29 +08:00
if indexMeta . Version > it . req . Version {
2021-09-26 21:22:04 +08:00
log . Warn ( "IndexNode try saveIndexFile failed req.Version is low" , zap . Any ( "req.Version" , it . req . Version ) ,
2021-06-06 09:41:35 +08:00
zap . Any ( "indexMeta.Version" , indexMeta . Version ) )
2021-10-13 23:24:32 +08:00
return errors . New ( "This task has been reassigned, check indexMeta.version and request " )
2021-05-27 22:24:29 +08:00
}
return saveBlob ( savePath , value )
}
2021-06-23 09:24:10 +08:00
err := retry . Do ( ctx , saveIndexFileFn , retry . Attempts ( 5 ) )
2020-12-22 08:14:36 +08:00
if err != nil {
2021-10-13 23:24:32 +08:00
log . Warn ( "IndexNode try saveIndexFile final" , zap . Error ( err ) , zap . Any ( "savePath" , savePath ) )
2020-12-22 08:14:36 +08:00
return err
}
2021-03-26 11:19:02 +08:00
it . savePaths [ idx ] = savePath
return nil
}
err = funcutil . ProcessFuncParallel ( len ( serializedIndexBlobs ) , runtime . NumCPU ( ) , saveIndexFile , "saveIndexFile" )
if err != nil {
2021-10-11 16:38:33 +08:00
log . Warn ( "saveIndexFile to minio failed" , zap . Error ( err ) )
it . internalErr = err
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
return nil
2020-12-22 08:14:36 +08:00
}
2021-06-30 19:46:14 +08:00
tr . Record ( "save index file done" )
2020-12-22 08:14:36 +08:00
}
2021-10-13 23:24:32 +08:00
log . Info ( "IndexNode CreateIndex successfully " , zap . Int64 ( "collect" , collectionID ) ,
2021-11-17 16:41:13 +08:00
zap . Int64 ( "partition" , partitionID ) , zap . Int64 ( "segment" , segmentID ) )
2021-06-30 19:46:14 +08:00
tr . Elapse ( "all done" )
2021-01-23 20:58:46 +08:00
return nil
2020-12-10 17:55:55 +08:00
}