2021-11-01 22:51:41 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:50:12 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-01 22:51:41 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:50:12 +08:00
//
2021-11-01 22:51:41 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:50:12 +08:00
2021-01-15 14:38:36 +08:00
package indexnode
2020-12-10 17:55:55 +08:00
import (
"context"
2021-03-08 10:09:48 +08:00
"errors"
2021-06-30 19:46:14 +08:00
"fmt"
2021-09-26 19:19:57 +08:00
"path"
2021-03-26 11:19:02 +08:00
"runtime"
2020-12-22 08:14:36 +08:00
"strconv"
2022-08-25 15:48:54 +08:00
"time"
2020-12-13 06:48:05 +08:00
2022-07-14 15:08:28 +08:00
"go.uber.org/zap"
2022-04-07 22:05:32 +08:00
2022-09-16 16:56:49 +08:00
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/schemapb"
2022-09-24 13:40:52 +08:00
"github.com/milvus-io/milvus/internal/log"
2022-07-14 15:08:28 +08:00
"github.com/milvus-io/milvus/internal/metrics"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
2022-09-21 20:16:51 +08:00
"github.com/milvus-io/milvus/internal/util"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/funcutil"
2022-07-14 15:08:28 +08:00
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
2022-09-21 20:16:51 +08:00
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
2022-08-25 15:48:54 +08:00
"github.com/milvus-io/milvus/internal/util/logutil"
2021-06-30 19:46:14 +08:00
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
2020-12-10 17:55:55 +08:00
)
2022-08-25 15:48:54 +08:00
var (
errCancel = fmt . Errorf ( "canceled" )
2021-02-07 21:32:37 +08:00
)
2022-03-21 14:23:24 +08:00
type Blob = storage . Blob
2022-08-25 15:48:54 +08:00
type taskInfo struct {
cancel context . CancelFunc
state commonpb . IndexState
indexFiles [ ] string
serializedSize uint64
2022-09-06 17:19:11 +08:00
failReason string
2020-12-10 17:55:55 +08:00
2022-08-25 15:48:54 +08:00
// task statistics
statistic * indexpb . JobInfo
2020-12-10 17:55:55 +08:00
}
2022-08-25 15:48:54 +08:00
type task interface {
Ctx ( ) context . Context
Name ( ) string
Prepare ( context . Context ) error
LoadData ( context . Context ) error
BuildIndex ( context . Context ) error
SaveIndexFiles ( context . Context ) error
OnEnqueue ( context . Context ) error
2022-09-06 17:19:11 +08:00
SetState ( state commonpb . IndexState , failReason string )
2022-08-25 15:48:54 +08:00
GetState ( ) commonpb . IndexState
Reset ( )
2020-12-10 17:55:55 +08:00
}
2021-10-09 00:05:49 +08:00
// IndexBuildTask is used to record the information of the index tasks.
2022-08-25 15:48:54 +08:00
type indexBuildTask struct {
ident string
cancel context . CancelFunc
ctx context . Context
2022-03-17 18:03:23 +08:00
cm storage . ChunkManager
2022-03-21 14:23:24 +08:00
index indexcgowrapper . CodecIndex
2021-12-09 14:19:40 +08:00
savePaths [ ] string
2022-08-25 15:48:54 +08:00
req * indexpb . CreateJobRequest
BuildID UniqueID
2021-12-09 14:19:40 +08:00
nodeID UniqueID
2022-09-06 17:19:11 +08:00
ClusterID string
2022-01-10 20:51:34 +08:00
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
2022-08-25 15:48:54 +08:00
fieldID UniqueID
fieldData storage . FieldData
indexBlobs [ ] * storage . Blob
2022-01-10 20:51:34 +08:00
newTypeParams map [ string ] string
newIndexParams map [ string ] string
2022-08-25 15:48:54 +08:00
serializedSize uint64
2022-01-10 20:51:34 +08:00
tr * timerecord . TimeRecorder
2022-08-25 15:48:54 +08:00
statistic indexpb . JobInfo
node * IndexNode
2021-01-26 09:38:40 +08:00
}
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) Reset ( ) {
it . ident = ""
it . cancel = nil
it . ctx = nil
it . cm = nil
it . index = nil
it . savePaths = nil
it . req = nil
it . fieldData = nil
it . indexBlobs = nil
it . newTypeParams = nil
it . newIndexParams = nil
it . tr = nil
it . node = nil
2021-02-23 09:58:06 +08:00
}
2022-08-25 15:48:54 +08:00
// Ctx is the context of index tasks.
func ( it * indexBuildTask ) Ctx ( ) context . Context {
return it . ctx
2020-12-13 06:48:05 +08:00
}
2020-12-10 17:55:55 +08:00
2021-10-09 00:05:49 +08:00
// Name is the name of task to build index.
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) Name ( ) string {
return it . ident
2021-02-23 09:58:06 +08:00
}
2022-09-06 17:19:11 +08:00
func ( it * indexBuildTask ) SetState ( state commonpb . IndexState , failReason string ) {
it . node . storeTaskState ( it . ClusterID , it . BuildID , state , failReason )
2020-12-10 17:55:55 +08:00
}
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) GetState ( ) commonpb . IndexState {
2022-09-23 09:40:52 +08:00
return it . node . loadTaskState ( it . ClusterID , it . BuildID )
2022-01-24 17:18:46 +08:00
}
2021-10-11 16:38:33 +08:00
2022-08-25 15:48:54 +08:00
// OnEnqueue enqueues indexing tasks.
func ( it * indexBuildTask ) OnEnqueue ( ctx context . Context ) error {
it . statistic . StartTime = time . Now ( ) . UnixMicro ( )
it . statistic . PodID = it . node . GetNodeID ( )
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Debug ( "IndexNode IndexBuilderTask Enqueue" )
2022-01-24 17:18:46 +08:00
return nil
2020-12-10 17:55:55 +08:00
}
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) Prepare ( ctx context . Context ) error {
2022-09-23 09:40:52 +08:00
logutil . Logger ( ctx ) . Info ( "Begin to prepare indexBuildTask" , zap . Int64 ( "buildID" , it . BuildID ) , zap . Int64 ( "Collection" , it . collectionID ) , zap . Int64 ( "SegmentIf" , it . segmentID ) )
2020-12-22 08:14:36 +08:00
typeParams := make ( map [ string ] string )
2022-09-21 20:16:51 +08:00
indexParams := make ( map [ string ] string )
// type params can be removed
2021-03-12 14:22:09 +08:00
for _ , kvPair := range it . req . GetTypeParams ( ) {
2020-12-22 08:14:36 +08:00
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
_ , ok := typeParams [ key ]
if ok {
return errors . New ( "duplicated key in type params" )
}
2022-09-21 20:16:51 +08:00
if key == util . ParamsKeyToParse {
2021-02-07 21:32:37 +08:00
params , err := funcutil . ParseIndexParamsMap ( value )
if err != nil {
return err
}
for pk , pv := range params {
typeParams [ pk ] = pv
2022-09-21 20:16:51 +08:00
indexParams [ pk ] = pv
2021-02-07 21:32:37 +08:00
}
} else {
typeParams [ key ] = value
2022-09-21 20:16:51 +08:00
indexParams [ key ] = value
2021-02-07 21:32:37 +08:00
}
2020-12-22 08:14:36 +08:00
}
2021-03-12 14:22:09 +08:00
for _ , kvPair := range it . req . GetIndexParams ( ) {
2020-12-22 08:14:36 +08:00
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
_ , ok := indexParams [ key ]
if ok {
return errors . New ( "duplicated key in index params" )
}
2022-09-21 20:16:51 +08:00
if key == util . ParamsKeyToParse {
2021-02-07 21:32:37 +08:00
params , err := funcutil . ParseIndexParamsMap ( value )
if err != nil {
return err
}
for pk , pv := range params {
indexParams [ pk ] = pv
}
} else {
indexParams [ key ] = value
}
2020-12-22 08:14:36 +08:00
}
2022-01-10 20:51:34 +08:00
it . newTypeParams = typeParams
it . newIndexParams = indexParams
2022-08-25 15:48:54 +08:00
it . statistic . IndexParams = it . req . GetIndexParams ( )
// ugly codes to get dimension
if dimStr , ok := typeParams [ "dim" ] ; ok {
var err error
it . statistic . Dim , err = strconv . ParseInt ( dimStr , 10 , 64 )
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "parse dimesion failed" , zap . Error ( err ) )
2022-08-25 15:48:54 +08:00
// ignore error
}
}
2022-09-23 09:40:52 +08:00
logutil . Logger ( ctx ) . Info ( "Successfully prepare indexBuildTask" , zap . Int64 ( "buildID" , it . BuildID ) , zap . Int64 ( "Collection" , it . collectionID ) , zap . Int64 ( "SegmentIf" , it . segmentID ) )
2022-08-25 15:48:54 +08:00
// setup chunkmanager
// opts := make([]storage.Option, 0)
// // TODO: secret access key_id
// opts = append(opts, storage.AccessKeyID(it.req.StorageAccessKey))
// opts = append(opts, storage.BucketName(it.req.BucketName))
// factory := storage.NewChunkManagerFactory("local", "minio", opts...)
// var err error
// it.cm, err = factory.NewVectorStorageChunkManager(ctx)
// if err != nil {
2022-09-24 13:40:52 +08:00
// log.Ctx(ctx).Error("init chunk manager failed", zap.Error(err), zap.String("BucketName", it.req.BucketName), zap.String("StorageAccessKey", it.req.StorageAccessKey))
2022-08-25 15:48:54 +08:00
// return err
// }
2022-01-10 20:51:34 +08:00
return nil
}
2020-12-22 08:14:36 +08:00
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) LoadData ( ctx context . Context ) error {
2020-12-22 08:14:36 +08:00
getValueByPath := func ( path string ) ( [ ] byte , error ) {
2022-09-29 16:18:56 +08:00
data , err := it . cm . Read ( ctx , path )
2020-12-22 08:14:36 +08:00
if err != nil {
2022-07-07 14:44:21 +08:00
if errors . Is ( err , ErrNoSuchKey ) {
return nil , ErrNoSuchKey
}
2020-12-22 08:14:36 +08:00
return nil , err
}
2022-03-17 18:03:23 +08:00
return data , nil
2020-12-22 08:14:36 +08:00
}
getBlobByPath := func ( path string ) ( * Blob , error ) {
value , err := getValueByPath ( path )
if err != nil {
return nil , err
}
return & Blob {
2022-01-10 20:51:34 +08:00
Key : path ,
2020-12-22 08:14:36 +08:00
Value : value ,
} , nil
}
2021-03-12 14:22:09 +08:00
toLoadDataPaths := it . req . GetDataPaths ( )
2021-03-26 11:19:02 +08:00
keys := make ( [ ] string , len ( toLoadDataPaths ) )
blobs := make ( [ ] * Blob , len ( toLoadDataPaths ) )
loadKey := func ( idx int ) error {
2022-01-10 20:51:34 +08:00
keys [ idx ] = toLoadDataPaths [ idx ]
2021-03-26 11:19:02 +08:00
blob , err := getBlobByPath ( toLoadDataPaths [ idx ] )
2020-12-22 08:14:36 +08:00
if err != nil {
return err
}
2021-03-26 11:19:02 +08:00
blobs [ idx ] = blob
return nil
}
2022-01-07 17:31:22 +08:00
// Use runtime.GOMAXPROCS(0) instead of runtime.NumCPU()
// to respect CPU quota of container/pod
// gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value
2022-01-10 20:51:34 +08:00
err := funcutil . ProcessFuncParallel ( len ( toLoadDataPaths ) , runtime . GOMAXPROCS ( 0 ) , loadKey , "loadKey" )
2021-03-26 11:19:02 +08:00
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Warn ( "loadKey failed" , zap . Error ( err ) )
2022-08-25 15:48:54 +08:00
return err
2020-12-22 08:14:36 +08:00
}
2022-08-25 15:48:54 +08:00
loadVectorDuration := it . tr . RecordSpan ( ) . Milliseconds ( )
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Debug ( "indexnode load data success" )
2022-03-21 14:23:24 +08:00
it . tr . Record ( "load field data done" )
2022-04-27 23:03:47 +08:00
metrics . IndexNodeLoadFieldLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( loadVectorDuration ) )
2020-12-22 08:14:36 +08:00
2022-09-23 09:40:52 +08:00
err = it . decodeBlobs ( ctx , blobs )
if err != nil {
logutil . Logger ( ctx ) . Info ( "failed to decode blobs" , zap . Int64 ( "buildID" , it . BuildID ) , zap . Int64 ( "Collection" , it . collectionID ) , zap . Int64 ( "SegmentIf" , it . segmentID ) , zap . Error ( err ) )
} else {
logutil . Logger ( ctx ) . Info ( "Successfully load data" , zap . Int64 ( "buildID" , it . BuildID ) , zap . Int64 ( "Collection" , it . collectionID ) , zap . Int64 ( "SegmentIf" , it . segmentID ) )
}
return err
2022-01-10 20:51:34 +08:00
}
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) BuildIndex ( ctx context . Context ) error {
2022-09-21 20:16:51 +08:00
// support build diskann index
indexType := it . newIndexParams [ "index_type" ]
if indexType == indexparamcheck . IndexDISKANN {
return it . BuildDiskAnnIndex ( ctx )
}
2022-08-25 15:48:54 +08:00
dataset := indexcgowrapper . GenDataset ( it . fieldData )
dType := dataset . DType
var err error
if dType != schemapb . DataType_None {
it . index , err = indexcgowrapper . NewCgoIndex ( dType , it . newTypeParams , it . newIndexParams )
2022-01-10 20:51:34 +08:00
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "failed to create index" , zap . Error ( err ) )
2022-08-25 15:48:54 +08:00
return err
2022-01-10 20:51:34 +08:00
}
2022-08-25 15:48:54 +08:00
err = it . index . Build ( dataset )
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "failed to build index" , zap . Error ( err ) )
2022-08-25 15:48:54 +08:00
return err
2021-01-07 15:39:20 +08:00
}
2022-01-10 20:51:34 +08:00
}
2022-08-25 15:48:54 +08:00
metrics . IndexNodeKnowhereBuildIndexLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( it . tr . RecordSpan ( ) . Milliseconds ( ) ) )
2020-12-22 08:14:36 +08:00
2022-08-25 15:48:54 +08:00
it . tr . Record ( "build index done" )
2022-01-10 20:51:34 +08:00
indexBlobs , err := it . index . Serialize ( )
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "IndexNode index Serialize failed" , zap . Error ( err ) )
2022-08-25 15:48:54 +08:00
return err
2022-01-10 20:51:34 +08:00
}
it . tr . Record ( "index serialize done" )
2020-12-22 08:14:36 +08:00
2022-07-19 21:30:30 +08:00
// use serialized size before encoding
it . serializedSize = 0
for _ , blob := range indexBlobs {
it . serializedSize += uint64 ( len ( blob . Value ) )
}
2022-01-10 20:51:34 +08:00
// early release index for gc, and we can ensure that Delete is idempotent.
if err := it . index . Delete ( ) ; err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "IndexNode indexBuildTask Execute CIndexDelete failed" , zap . Error ( err ) )
2022-01-10 20:51:34 +08:00
}
2021-09-26 19:19:57 +08:00
2022-01-10 20:51:34 +08:00
var serializedIndexBlobs [ ] * storage . Blob
codec := storage . NewIndexFileBinlogCodec ( )
serializedIndexBlobs , err = codec . Serialize (
2022-08-25 15:48:54 +08:00
it . req . BuildID ,
it . req . IndexVersion ,
2022-01-10 20:51:34 +08:00
it . collectionID ,
it . partitionID ,
it . segmentID ,
2022-08-25 15:48:54 +08:00
it . fieldID ,
2022-01-10 20:51:34 +08:00
it . newIndexParams ,
it . req . IndexName ,
it . req . IndexID ,
indexBlobs ,
)
if err != nil {
2022-08-25 15:48:54 +08:00
return err
2022-01-10 20:51:34 +08:00
}
2022-03-04 15:45:56 +08:00
encodeIndexFileDur := it . tr . Record ( "index codec serialize done" )
2022-04-24 22:03:44 +08:00
metrics . IndexNodeEncodeIndexFileLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( encodeIndexFileDur . Milliseconds ( ) ) )
2022-08-25 15:48:54 +08:00
it . indexBlobs = serializedIndexBlobs
2022-09-23 09:40:52 +08:00
logutil . Logger ( ctx ) . Info ( "Successfully build index" , zap . Int64 ( "buildID" , it . BuildID ) , zap . Int64 ( "Collection" , it . collectionID ) , zap . Int64 ( "SegmentID" , it . segmentID ) )
2022-08-25 15:48:54 +08:00
return nil
2022-01-10 20:51:34 +08:00
}
2022-09-21 20:16:51 +08:00
func ( it * indexBuildTask ) BuildDiskAnnIndex ( ctx context . Context ) error {
// check index node support disk index
if ! Params . IndexNodeCfg . EnableDisk {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "IndexNode don't support build disk index" ,
2022-09-21 20:16:51 +08:00
zap . String ( "index type" , it . newIndexParams [ "index_type" ] ) ,
zap . Bool ( "enable disk" , Params . IndexNodeCfg . EnableDisk ) )
return errors . New ( "index node don't support build disk index" )
}
// check load size and size of field data
localUsedSize , err := indexcgowrapper . GetLocalUsedSize ( )
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "IndexNode get local used size failed" )
2022-09-21 20:16:51 +08:00
return errors . New ( "index node get local used size failed" )
}
usedLocalSizeWhenBuild := int64 ( float64 ( it . fieldData . GetMemorySize ( ) ) * 2.6 ) + localUsedSize
maxUsedLocalSize := int64 ( float64 ( Params . IndexNodeCfg . DiskCapacityLimit ) * Params . IndexNodeCfg . MaxDiskUsagePercentage )
if usedLocalSizeWhenBuild > maxUsedLocalSize {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "IndexNode don't has enough disk size to build disk ann index" ,
2022-09-21 20:16:51 +08:00
zap . Int64 ( "usedLocalSizeWhenBuild" , usedLocalSizeWhenBuild ) ,
zap . Int64 ( "maxUsedLocalSize" , maxUsedLocalSize ) )
return errors . New ( "index node don't has enough disk size to build disk ann index" )
}
dataset := indexcgowrapper . GenDataset ( it . fieldData )
dType := dataset . DType
if dType != schemapb . DataType_None {
// TODO:: too ugly
it . newIndexParams [ "collection_id" ] = strconv . FormatInt ( it . collectionID , 10 )
it . newIndexParams [ "partition_id" ] = strconv . FormatInt ( it . partitionID , 10 )
it . newIndexParams [ "segment_id" ] = strconv . FormatInt ( it . segmentID , 10 )
it . newIndexParams [ "field_id" ] = strconv . FormatInt ( it . fieldID , 10 )
it . newIndexParams [ "index_build_id" ] = strconv . FormatInt ( it . req . GetBuildID ( ) , 10 )
it . newIndexParams [ "index_id" ] = strconv . FormatInt ( it . req . IndexID , 10 )
it . newIndexParams [ "index_version" ] = strconv . FormatInt ( it . req . GetIndexVersion ( ) , 10 )
it . index , err = indexcgowrapper . NewCgoIndex ( dType , it . newTypeParams , it . newIndexParams )
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "failed to create index" , zap . Error ( err ) )
2022-09-21 20:16:51 +08:00
return err
}
err = it . index . Build ( dataset )
if err != nil {
if it . index . CleanLocalData ( ) != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "failed to clean cached data on disk after build index failed" ,
2022-09-21 20:16:51 +08:00
zap . Int64 ( "buildID" , it . BuildID ) ,
zap . Int64 ( "index version" , it . req . GetIndexVersion ( ) ) )
}
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "failed to build index" , zap . Error ( err ) )
2022-09-21 20:16:51 +08:00
return err
}
}
metrics . IndexNodeKnowhereBuildIndexLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( it . tr . RecordSpan ( ) ) )
it . tr . Record ( "build index done" )
2022-09-27 10:32:53 +08:00
fileInfos , err := it . index . GetIndexFileInfo ( )
2022-09-21 20:16:51 +08:00
if err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "IndexNode index Serialize failed" , zap . Error ( err ) )
2022-09-21 20:16:51 +08:00
return err
}
it . tr . Record ( "index serialize done" )
// use serialized size before encoding
it . serializedSize = 0
2022-09-27 10:32:53 +08:00
for _ , info := range fileInfos {
it . serializedSize += uint64 ( info . FileSize )
it . indexBlobs = append ( it . indexBlobs , & storage . Blob {
Key : info . FileName ,
Size : info . FileSize ,
} )
2022-09-21 20:16:51 +08:00
}
// early release index for gc, and we can ensure that Delete is idempotent.
if err := it . index . Delete ( ) ; err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( it . ctx ) . Error ( "IndexNode indexBuildTask Execute CIndexDelete failed" , zap . Error ( err ) )
2022-09-21 20:16:51 +08:00
}
encodeIndexFileDur := it . tr . Record ( "index codec serialize done" )
metrics . IndexNodeEncodeIndexFileLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( encodeIndexFileDur . Milliseconds ( ) ) )
return nil
}
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) SaveIndexFiles ( ctx context . Context ) error {
2022-09-21 20:16:51 +08:00
// support build diskann index
indexType := it . newIndexParams [ "index_type" ]
if indexType == indexparamcheck . IndexDISKANN {
return it . SaveDiskAnnIndexFiles ( ctx )
}
2022-08-25 15:48:54 +08:00
blobCnt := len ( it . indexBlobs )
2022-01-10 20:51:34 +08:00
getSavePathByKey := func ( key string ) string {
2022-08-25 15:48:54 +08:00
return path . Join ( it . req . IndexFilePrefix , strconv . Itoa ( int ( it . req . BuildID ) ) , strconv . Itoa ( int ( it . req . IndexVersion ) ) ,
2022-01-10 20:51:34 +08:00
strconv . Itoa ( int ( it . partitionID ) ) , strconv . Itoa ( int ( it . segmentID ) ) , key )
}
2020-12-22 08:14:36 +08:00
2022-07-14 15:08:28 +08:00
savePaths := make ( [ ] string , blobCnt )
2022-01-10 20:51:34 +08:00
saveIndexFile := func ( idx int ) error {
2022-08-25 15:48:54 +08:00
blob := it . indexBlobs [ idx ]
2022-01-10 20:51:34 +08:00
savePath := getSavePathByKey ( blob . Key )
2022-08-25 15:48:54 +08:00
saveFn := func ( ) error {
2022-09-29 16:18:56 +08:00
return it . cm . Write ( ctx , savePath , blob . Value )
2022-01-10 20:51:34 +08:00
}
2022-08-25 15:48:54 +08:00
if err := retry . Do ( ctx , saveFn , retry . Attempts ( 5 ) ) ; err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Warn ( "index node save index file failed" , zap . Error ( err ) , zap . String ( "savePath" , savePath ) )
2022-01-10 20:51:34 +08:00
return err
}
2022-07-14 15:08:28 +08:00
savePaths [ idx ] = savePath
2022-01-10 20:51:34 +08:00
return nil
}
2021-03-26 11:19:02 +08:00
2022-05-17 20:41:56 +08:00
// If an error occurs, return the error that the task state will be set to retry.
2022-07-14 15:08:28 +08:00
if err := funcutil . ProcessFuncParallel ( blobCnt , runtime . NumCPU ( ) , saveIndexFile , "saveIndexFile" ) ; err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Error ( "saveIndexFile fail" )
2022-07-14 15:08:28 +08:00
return err
}
it . savePaths = savePaths
2022-08-25 15:48:54 +08:00
it . statistic . EndTime = time . Now ( ) . UnixMicro ( )
it . node . storeIndexFilesAndStatistic ( it . ClusterID , it . BuildID , savePaths , it . serializedSize , & it . statistic )
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Debug ( "save index files done" , zap . Strings ( "IndexFiles" , savePaths ) )
2022-08-25 15:48:54 +08:00
saveIndexFileDur := it . tr . Record ( "index file save done" )
metrics . IndexNodeSaveIndexFileLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( saveIndexFileDur . Milliseconds ( ) ) )
it . tr . Elapse ( "index building all done" )
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Info ( "Successfully save index files" , zap . Int64 ( "buildID" , it . BuildID ) , zap . Int64 ( "Collection" , it . collectionID ) ,
2022-09-23 09:40:52 +08:00
zap . Int64 ( "partition" , it . partitionID ) , zap . Int64 ( "SegmentId" , it . segmentID ) )
2022-07-14 15:08:28 +08:00
return nil
2022-01-10 20:51:34 +08:00
}
2021-03-26 11:19:02 +08:00
2022-09-21 20:16:51 +08:00
func ( it * indexBuildTask ) SaveDiskAnnIndexFiles ( ctx context . Context ) error {
savePaths := make ( [ ] string , len ( it . indexBlobs ) )
for i , blob := range it . indexBlobs {
savePath := blob . Key
savePaths [ i ] = savePath
}
// add indexparams file
codec := storage . NewIndexFileBinlogCodec ( )
indexParamBlob , err := codec . SerializeIndexParams (
it . req . GetBuildID ( ) ,
it . req . GetIndexVersion ( ) ,
it . collectionID ,
it . partitionID ,
it . segmentID ,
it . fieldID ,
it . newIndexParams ,
it . req . IndexName ,
it . req . IndexID ,
)
if err != nil {
return err
}
getSavePathByKey := func ( key string ) string {
return path . Join ( "files/index_files" , strconv . Itoa ( int ( it . req . BuildID ) ) , strconv . Itoa ( int ( it . req . IndexVersion ) ) ,
strconv . Itoa ( int ( it . partitionID ) ) , strconv . Itoa ( int ( it . segmentID ) ) , key )
}
indexParamPath := getSavePathByKey ( indexParamBlob . Key )
saveFn := func ( ) error {
2022-09-29 16:18:56 +08:00
return it . cm . Write ( ctx , indexParamPath , indexParamBlob . Value )
2022-09-21 20:16:51 +08:00
}
if err := retry . Do ( ctx , saveFn , retry . Attempts ( 5 ) ) ; err != nil {
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Warn ( "index node save index param file failed" , zap . Error ( err ) , zap . String ( "savePath" , indexParamPath ) )
2022-09-21 20:16:51 +08:00
return err
}
savePaths = append ( savePaths , indexParamPath )
it . savePaths = savePaths
it . statistic . EndTime = time . Now ( ) . UnixMicro ( )
it . node . storeIndexFilesAndStatistic ( it . ClusterID , it . BuildID , savePaths , it . serializedSize , & it . statistic )
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Debug ( "save index files done" , zap . Strings ( "IndexFiles" , savePaths ) )
2022-09-21 20:16:51 +08:00
saveIndexFileDur := it . tr . Record ( "index file save done" )
metrics . IndexNodeSaveIndexFileLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( saveIndexFileDur . Milliseconds ( ) ) )
it . tr . Elapse ( "index building all done" )
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Info ( "IndexNode CreateIndex successfully " , zap . Int64 ( "collect" , it . collectionID ) ,
2022-09-21 20:16:51 +08:00
zap . Int64 ( "partition" , it . partitionID ) , zap . Int64 ( "segment" , it . segmentID ) )
return nil
}
2022-08-25 15:48:54 +08:00
func ( it * indexBuildTask ) decodeBlobs ( ctx context . Context , blobs [ ] * storage . Blob ) error {
var insertCodec storage . InsertCodec
collectionID , partitionID , segmentID , insertData , err2 := insertCodec . DeserializeAll ( blobs )
if err2 != nil {
return err2
2022-06-28 12:08:17 +08:00
}
2022-08-25 15:48:54 +08:00
decodeDuration := it . tr . RecordSpan ( ) . Milliseconds ( )
metrics . IndexNodeDecodeFieldLatency . WithLabelValues ( strconv . FormatInt ( Params . IndexNodeCfg . GetNodeID ( ) , 10 ) ) . Observe ( float64 ( decodeDuration ) )
2022-06-28 12:08:17 +08:00
2022-08-25 15:48:54 +08:00
if len ( insertData . Data ) != 1 {
return errors . New ( "we expect only one field in deserialized insert data" )
2022-01-10 20:51:34 +08:00
}
2022-08-25 15:48:54 +08:00
it . collectionID = collectionID
it . partitionID = partitionID
it . segmentID = segmentID
2022-01-10 20:51:34 +08:00
2022-09-24 13:40:52 +08:00
log . Ctx ( ctx ) . Debug ( "indexnode deserialize data success" ,
2022-08-25 15:48:54 +08:00
zap . Int64 ( "index id" , it . req . IndexID ) ,
zap . String ( "index name" , it . req . IndexName ) ,
zap . Int64 ( "collectionID" , it . collectionID ) ,
zap . Int64 ( "partitionID" , it . partitionID ) ,
zap . Int64 ( "segmentID" , it . segmentID ) )
2022-01-10 20:51:34 +08:00
2022-08-25 15:48:54 +08:00
it . tr . Record ( "deserialize vector data done" )
2022-01-10 20:51:34 +08:00
2022-08-25 15:48:54 +08:00
// we can ensure that there blobs are in one Field
var data storage . FieldData
var fieldID storage . FieldID
for fID , value := range insertData . Data {
data = value
fieldID = fID
break
2020-12-22 08:14:36 +08:00
}
2022-08-25 15:48:54 +08:00
it . statistic . NumRows = int64 ( data . RowNum ( ) )
it . fieldID = fieldID
it . fieldData = data
2021-01-23 20:58:46 +08:00
return nil
2020-12-10 17:55:55 +08:00
}