enhance: Refine IndexNode code and ensure compatibility (#33458)

issue: #33432 , #33183

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-06-05 19:35:52 +08:00 committed by GitHub
parent 8ad26093ba
commit 412ccfbb20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 695 additions and 924 deletions

View File

@ -1,14 +1,18 @@
package indexnode
import "math/rand"
import (
"fmt"
"math/rand"
const (
dim = 8
nb = 10000
nprobe = 8
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func generateFloatVectors() []float32 {
func generateFloatVectors(nb, dim int) []float32 {
vectors := make([]float32, 0)
for i := 0; i < nb; i++ {
for j := 0; j < dim; j++ {
@ -18,12 +22,147 @@ func generateFloatVectors() []float32 {
return vectors
}
func generateBinaryVectors() []byte {
vectors := make([]byte, 0)
for i := 0; i < nb; i++ {
for j := 0; j < dim/8; j++ {
vectors = append(vectors, byte(rand.Intn(8)))
}
}
return vectors
func generateTestSchema() *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
{FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool},
{FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8},
{FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16},
{FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64},
{FieldID: 14, Name: "float", DataType: schemapb.DataType_Float},
{FieldID: 15, Name: "double", DataType: schemapb.DataType_Double},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar},
{FieldID: 17, Name: "string", DataType: schemapb.DataType_String},
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
{FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON},
{FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32},
{FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "28433"},
}},
}}
return schema
}
func generateTestData(collID, partID, segID int64, num int) ([]*Blob, error) {
insertCodec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: collID, Schema: generateTestSchema()})
var (
field0 []int64
field1 []int64
field10 []bool
field11 []int8
field12 []int16
field13 []int64
field14 []float32
field15 []float64
field16 []string
field17 []string
field18 []*schemapb.ScalarField
field19 [][]byte
field101 []int32
field102 []float32
field103 []byte
field104 []byte
field105 []byte
field106 [][]byte
)
for i := 1; i <= num; i++ {
field0 = append(field0, int64(i))
field1 = append(field1, int64(i))
field10 = append(field10, true)
field11 = append(field11, int8(i))
field12 = append(field12, int16(i))
field13 = append(field13, int64(i))
field14 = append(field14, float32(i))
field15 = append(field15, float64(i))
field16 = append(field16, fmt.Sprint(i))
field17 = append(field17, fmt.Sprint(i))
arr := &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}},
},
}
field18 = append(field18, arr)
field19 = append(field19, []byte{byte(i)})
field101 = append(field101, int32(i))
f102 := make([]float32, 8)
for j := range f102 {
f102[j] = float32(i)
}
field102 = append(field102, f102...)
field103 = append(field103, 0xff)
f104 := make([]byte, 16)
for j := range f104 {
f104[j] = byte(i)
}
field104 = append(field104, f104...)
field105 = append(field105, f104...)
field106 = append(field106, typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}))
}
data := &storage.InsertData{Data: map[int64]storage.FieldData{
common.RowIDField: &storage.Int64FieldData{Data: field0},
common.TimeStampField: &storage.Int64FieldData{Data: field1},
10: &storage.BoolFieldData{Data: field10},
11: &storage.Int8FieldData{Data: field11},
12: &storage.Int16FieldData{Data: field12},
13: &storage.Int64FieldData{Data: field13},
14: &storage.FloatFieldData{Data: field14},
15: &storage.DoubleFieldData{Data: field15},
16: &storage.StringFieldData{Data: field16},
17: &storage.StringFieldData{Data: field17},
18: &storage.ArrayFieldData{Data: field18},
19: &storage.JSONFieldData{Data: field19},
101: &storage.Int32FieldData{Data: field101},
102: &storage.FloatVectorFieldData{
Data: field102,
Dim: 8,
},
103: &storage.BinaryVectorFieldData{
Data: field103,
Dim: 8,
},
104: &storage.Float16VectorFieldData{
Data: field104,
Dim: 8,
},
105: &storage.BFloat16VectorFieldData{
Data: field105,
Dim: 8,
},
106: &storage.SparseFloatVectorFieldData{
SparseFloatArray: schemapb.SparseFloatArray{
Dim: 28433,
Contents: field106,
},
},
}}
blobs, err := insertCodec.Serialize(partID, segID, data)
return blobs, err
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"strconv"
"github.com/golang/protobuf/proto"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -36,7 +35,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -100,35 +98,9 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
}
var task task
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
task = &indexBuildTaskV2{
indexBuildTask: &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
},
}
task = newIndexBuildTaskV2(taskCtx, taskCancel, req, i)
} else {
task = &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
}
task = newIndexBuildTask(taskCtx, taskCancel, req, cm, i)
}
ret := merr.Success()
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
@ -222,6 +194,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
return merr.Success(), nil
}
// GetJobStats should be GetSlots
func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
if err := i.lifetime.Add(merr.IsHealthyOrStopping); err != nil {
log.Ctx(ctx).Warn("index node not ready", zap.Error(err))
@ -231,12 +204,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
}
defer i.lifetime.Done()
unissued, active := i.sched.IndexBuildQueue.GetTaskNum()
jobInfos := make([]*indexpb.JobInfo, 0)
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
if info.statistic != nil {
jobInfos = append(jobInfos, proto.Clone(info.statistic).(*indexpb.JobInfo))
}
})
slots := 0
if i.sched.buildParallel > unissued+active {
slots = i.sched.buildParallel - unissued - active
@ -252,7 +219,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
InProgressJobNum: int64(active),
EnqueueJobNum: int64(unissued),
TaskSlots: int64(slots),
JobInfos: jobInfos,
EnableDisk: Params.IndexNodeCfg.EnableDisk.GetAsBool(),
}, nil
}

View File

@ -19,442 +19,23 @@ package indexnode
import (
"context"
"os"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
//func TestRegister(t *testing.T) {
// var (
// factory = &mockFactory{}
// ctx = context.TODO()
// )
// Params.Init()
// in, err := NewIndexNode(ctx, factory)
// assert.NoError(t, err)
// in.SetEtcdClient(getEtcdClient())
// assert.Nil(t, in.initSession())
// assert.Nil(t, in.Register())
// key := in.session.ServerName
// if !in.session.Exclusive {
// key = fmt.Sprintf("%s-%d", key, in.session.ServerID)
// }
// resp, err := getEtcdClient().Get(ctx, path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, key))
// assert.NoError(t, err)
// assert.Equal(t, int64(1), resp.Count)
// sess := &sessionutil.Session{}
// assert.Nil(t, json.Unmarshal(resp.Kvs[0].Value, sess))
// assert.Equal(t, sess.ServerID, in.session.ServerID)
// assert.Equal(t, sess.Address, in.session.Address)
// assert.Equal(t, sess.ServerName, in.session.ServerName)
//
// // revoke lease
// in.session.Revoke(time.Second)
//
// in.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/lib/milvus"))
// t.Run("CreateIndex FloatVector", func(t *testing.T) {
// var insertCodec storage.InsertCodec
//
// insertCodec.Schema = &etcdpb.CollectionMeta{
// ID: collectionID,
// Schema: &schemapb.CollectionSchema{
// Fields: []*schemapb.FieldSchema{
// {
// FieldID: floatVectorFieldID,
// Name: floatVectorFieldName,
// IsPrimaryKey: false,
// DataType: schemapb.DataType_FloatVector,
// },
// },
// },
// }
// data := make(map[UniqueID]storage.FieldData)
// tsData := make([]int64, nb)
// for i := 0; i < nb; i++ {
// tsData[i] = int64(i + 100)
// }
// data[tsFieldID] = &storage.Int64FieldData{
// NumRows: []int64{nb},
// Data: tsData,
// }
// data[floatVectorFieldID] = &storage.FloatVectorFieldData{
// NumRows: []int64{nb},
// Data: generateFloatVectors(),
// Dim: dim,
// }
// insertData := storage.InsertData{
// Data: data,
// Infos: []storage.BlobInfo{
// {
// Length: 10,
// },
// },
// }
// binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
// assert.NoError(t, err)
// kvs := make(map[string][]byte, len(binLogs))
// paths := make([]string, 0, len(binLogs))
// for i, blob := range binLogs {
// key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
// paths = append(paths, key)
// kvs[key] = blob.Value[:]
// }
// err = in.chunkManager.MultiWrite(kvs)
// assert.NoError(t, err)
//
// indexMeta := &indexpb.IndexMeta{
// IndexBuildID: indexBuildID1,
// State: commonpb.IndexState_InProgress,
// IndexVersion: 1,
// }
//
// value, err := proto.Marshal(indexMeta)
// assert.NoError(t, err)
// err = in.etcdKV.Save(metaPath1, string(value))
// assert.NoError(t, err)
// req := &indexpb.CreateIndexRequest{
// IndexBuildID: indexBuildID1,
// IndexName: "FloatVector",
// IndexID: indexID,
// Version: 1,
// MetaPath: metaPath1,
// DataPaths: paths,
// TypeParams: []*commonpb.KeyValuePair{
// {
// Key: common.DimKey,
// Value: "8",
// },
// },
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: common.IndexTypeKey,
// Value: "IVF_SQ8",
// },
// {
// Key: common.IndexParamsKey,
// Value: "{\"nlist\": 128}",
// },
// {
// Key: common.MetricTypeKey,
// Value: "L2",
// },
// },
// }
//
// status, err2 := in.CreateIndex(ctx, req)
// assert.Nil(t, err2)
// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
//
// strValue, err3 := in.etcdKV.Load(metaPath1)
// assert.Nil(t, err3)
// indexMetaTmp := indexpb.IndexMeta{}
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.NoError(t, err)
// for indexMetaTmp.State != commonpb.IndexState_Finished {
// time.Sleep(100 * time.Millisecond)
// strValue, err := in.etcdKV.Load(metaPath1)
// assert.NoError(t, err)
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.NoError(t, err)
// }
// defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFileKeys)
// defer func() {
// for k := range kvs {
// err = in.chunkManager.Remove(k)
// assert.NoError(t, err)
// }
// }()
//
// defer in.etcdKV.RemoveWithPrefix(metaPath1)
// })
// t.Run("CreateIndex BinaryVector", func(t *testing.T) {
// var insertCodec storage.InsertCodec
//
// insertCodec.Schema = &etcdpb.CollectionMeta{
// ID: collectionID,
// Schema: &schemapb.CollectionSchema{
// Fields: []*schemapb.FieldSchema{
// {
// FieldID: binaryVectorFieldID,
// Name: binaryVectorFieldName,
// IsPrimaryKey: false,
// DataType: schemapb.DataType_BinaryVector,
// },
// },
// },
// }
// data := make(map[UniqueID]storage.FieldData)
// tsData := make([]int64, nb)
// for i := 0; i < nb; i++ {
// tsData[i] = int64(i + 100)
// }
// data[tsFieldID] = &storage.Int64FieldData{
// NumRows: []int64{nb},
// Data: tsData,
// }
// data[binaryVectorFieldID] = &storage.BinaryVectorFieldData{
// NumRows: []int64{nb},
// Data: generateBinaryVectors(),
// Dim: dim,
// }
// insertData := storage.InsertData{
// Data: data,
// Infos: []storage.BlobInfo{
// {
// Length: 10,
// },
// },
// }
// binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
// assert.NoError(t, err)
// kvs := make(map[string][]byte, len(binLogs))
// paths := make([]string, 0, len(binLogs))
// for i, blob := range binLogs {
// key := path.Join(binaryVectorBinlogPath, strconv.Itoa(i))
// paths = append(paths, key)
// kvs[key] = blob.Value[:]
// }
// err = in.chunkManager.MultiWrite(kvs)
// assert.NoError(t, err)
//
// indexMeta := &indexpb.IndexMeta{
// IndexBuildID: indexBuildID2,
// State: commonpb.IndexState_InProgress,
// IndexVersion: 1,
// }
//
// value, err := proto.Marshal(indexMeta)
// assert.NoError(t, err)
// err = in.etcdKV.Save(metaPath2, string(value))
// assert.NoError(t, err)
// req := &indexpb.CreateIndexRequest{
// IndexBuildID: indexBuildID2,
// IndexName: "BinaryVector",
// IndexID: indexID,
// Version: 1,
// MetaPath: metaPath2,
// DataPaths: paths,
// TypeParams: []*commonpb.KeyValuePair{
// {
// Key: common.DimKey,
// Value: "8",
// },
// },
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: common.IndexTypeKey,
// Value: "BIN_FLAT",
// },
// {
// Key: common.MetricTypeKey,
// Value: "JACCARD",
// },
// },
// }
//
// status, err2 := in.CreateIndex(ctx, req)
// assert.Nil(t, err2)
// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
//
// strValue, err3 := in.etcdKV.Load(metaPath2)
// assert.Nil(t, err3)
// indexMetaTmp := indexpb.IndexMeta{}
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.NoError(t, err)
// for indexMetaTmp.State != commonpb.IndexState_Finished {
// time.Sleep(100 * time.Millisecond)
// strValue, err = in.etcdKV.Load(metaPath2)
// assert.NoError(t, err)
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.NoError(t, err)
// }
// defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFileKeys)
// defer func() {
// for k := range kvs {
// err = in.chunkManager.Remove(k)
// assert.NoError(t, err)
// }
// }()
//
// defer in.etcdKV.RemoveWithPrefix(metaPath2)
// })
//
// t.Run("Create DeletedIndex", func(t *testing.T) {
// var insertCodec storage.InsertCodec
//
// insertCodec.Schema = &etcdpb.CollectionMeta{
// ID: collectionID,
// Schema: &schemapb.CollectionSchema{
// Fields: []*schemapb.FieldSchema{
// {
// FieldID: floatVectorFieldID,
// Name: floatVectorFieldName,
// IsPrimaryKey: false,
// DataType: schemapb.DataType_FloatVector,
// },
// },
// },
// }
// data := make(map[UniqueID]storage.FieldData)
// tsData := make([]int64, nb)
// for i := 0; i < nb; i++ {
// tsData[i] = int64(i + 100)
// }
// data[tsFieldID] = &storage.Int64FieldData{
// NumRows: []int64{nb},
// Data: tsData,
// }
// data[floatVectorFieldID] = &storage.FloatVectorFieldData{
// NumRows: []int64{nb},
// Data: generateFloatVectors(),
// Dim: dim,
// }
// insertData := storage.InsertData{
// Data: data,
// Infos: []storage.BlobInfo{
// {
// Length: 10,
// },
// },
// }
// binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
// assert.NoError(t, err)
// kvs := make(map[string][]byte, len(binLogs))
// paths := make([]string, 0, len(binLogs))
// for i, blob := range binLogs {
// key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
// paths = append(paths, key)
// kvs[key] = blob.Value[:]
// }
// err = in.chunkManager.MultiWrite(kvs)
// assert.NoError(t, err)
//
// indexMeta := &indexpb.IndexMeta{
// IndexBuildID: indexBuildID1,
// State: commonpb.IndexState_InProgress,
// IndexVersion: 1,
// MarkDeleted: true,
// }
//
// value, err := proto.Marshal(indexMeta)
// assert.NoError(t, err)
// err = in.etcdKV.Save(metaPath3, string(value))
// assert.NoError(t, err)
// req := &indexpb.CreateIndexRequest{
// IndexBuildID: indexBuildID1,
// IndexName: "FloatVector",
// IndexID: indexID,
// Version: 1,
// MetaPath: metaPath3,
// DataPaths: paths,
// TypeParams: []*commonpb.KeyValuePair{
// {
// Key: common.DimKey,
// Value: "8",
// },
// },
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: common.IndexTypeKey,
// Value: "IVF_SQ8",
// },
// {
// Key: common.IndexParamsKey,
// Value: "{\"nlist\": 128}",
// },
// {
// Key: common.MetricTypeKey,
// Value: "L2",
// },
// },
// }
//
// status, err2 := in.CreateIndex(ctx, req)
// assert.Nil(t, err2)
// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
// time.Sleep(100 * time.Millisecond)
// strValue, err3 := in.etcdKV.Load(metaPath3)
// assert.Nil(t, err3)
// indexMetaTmp := indexpb.IndexMeta{}
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.NoError(t, err)
// assert.Equal(t, true, indexMetaTmp.MarkDeleted)
// assert.Equal(t, int64(1), indexMetaTmp.IndexVersion)
// //for indexMetaTmp.State != commonpb.IndexState_Finished {
// // time.Sleep(100 * time.Millisecond)
// // strValue, err := in.etcdKV.Load(metaPath3)
// // assert.NoError(t, err)
// // err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// // assert.NoError(t, err)
// //}
// defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFileKeys)
// defer func() {
// for k := range kvs {
// err = in.chunkManager.Remove(k)
// assert.NoError(t, err)
// }
// }()
//
// defer in.etcdKV.RemoveWithPrefix(metaPath3)
// })
//
// t.Run("GetComponentStates", func(t *testing.T) {
// resp, err := in.GetComponentStates(ctx)
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// assert.Equal(t, commonpb.StateCode_Healthy, resp.State.StateCode)
// })
//
// t.Run("GetTimeTickChannel", func(t *testing.T) {
// resp, err := in.GetTimeTickChannel(ctx)
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// })
//
// t.Run("GetStatisticsChannel", func(t *testing.T) {
// resp, err := in.GetStatisticsChannel(ctx)
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// })
//
// t.Run("ShowConfigurations", func(t *testing.T) {
// pattern := "Port"
// req := &internalpb.ShowConfigurationsRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_WatchQueryChannels,
// MsgID: rand.Int63(),
// },
// Pattern: pattern,
// }
//
// resp, err := in.ShowConfigurations(ctx, req)
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// assert.Equal(t, 1, len(resp.Configuations))
// assert.Equal(t, "indexnode.port", resp.Configuations[0].Key)
// })
//
// t.Run("GetMetrics_system_info", func(t *testing.T) {
// req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
// assert.NoError(t, err)
// resp, err := in.GetMetrics(ctx, req)
// assert.NoError(t, err)
// log.Info("GetMetrics_system_info",
// zap.String("resp", resp.Response),
// zap.String("name", resp.ComponentName))
// })
// err = in.etcdKV.RemoveWithPrefix("session/IndexNode")
// assert.NoError(t, err)
//
// resp, err = getEtcdClient().Get(ctx, path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, in.session.ServerName))
// assert.NoError(t, err)
// assert.Equal(t, resp.Count, int64(0))
//}
func TestComponentState(t *testing.T) {
var (
factory = &mockFactory{
@ -591,3 +172,360 @@ func TestMain(m *testing.M) {
teardown()
os.Exit(code)
}
type IndexNodeSuite struct {
suite.Suite
collID int64
partID int64
segID int64
fieldID int64
logID int64
data []*Blob
in *IndexNode
storageConfig *indexpb.StorageConfig
cm storage.ChunkManager
}
func Test_IndexNodeSuite(t *testing.T) {
suite.Run(t, new(IndexNodeSuite))
}
func (s *IndexNodeSuite) SetupTest() {
s.collID = 1
s.partID = 2
s.segID = 3
s.fieldID = 102
s.logID = 10000
paramtable.Init()
Params.MinioCfg.RootPath.SwapTempValue("indexnode-ut")
var err error
s.data, err = generateTestData(s.collID, s.partID, s.segID, 1025)
s.NoError(err)
s.storageConfig = &indexpb.StorageConfig{
Address: Params.MinioCfg.Address.GetValue(),
AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(),
UseSSL: Params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: Params.MinioCfg.SslCACert.GetValue(),
BucketName: Params.MinioCfg.BucketName.GetValue(),
RootPath: Params.MinioCfg.RootPath.GetValue(),
UseIAM: Params.MinioCfg.UseIAM.GetAsBool(),
IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(),
StorageType: Params.CommonCfg.StorageType.GetValue(),
Region: Params.MinioCfg.Region.GetValue(),
UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(),
CloudProvider: Params.MinioCfg.CloudProvider.GetValue(),
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
}
var (
factory = &mockFactory{
chunkMgr: &mockChunkmgr{},
}
ctx = context.TODO()
)
s.in = NewIndexNode(ctx, factory)
err = s.in.Init()
s.NoError(err)
err = s.in.Start()
s.NoError(err)
s.cm, err = s.in.storageFactory.NewChunkManager(context.Background(), s.storageConfig)
s.NoError(err)
logID := int64(10000)
for i, blob := range s.data {
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
filePath, err := binlog.BuildLogPath(storage.InsertBinlog, s.collID, s.partID, s.segID, fID, logID+int64(i))
s.NoError(err)
err = s.cm.Write(context.Background(), filePath, blob.GetValue())
s.NoError(err)
}
}
func (s *IndexNodeSuite) TearDownSuite() {
err := s.cm.RemoveWithPrefix(context.Background(), "indexnode-ut")
s.NoError(err)
Params.MinioCfg.RootPath.SwapTempValue("files")
err = s.in.Stop()
s.NoError(err)
}
func (s *IndexNodeSuite) Test_CreateIndexJob_Compatibility() {
s.Run("create vec index", func() {
ctx := context.Background()
s.Run("v2.3.x", func() {
buildID := int64(1)
dataPath, err := binlog.BuildLogPath(storage.InsertBinlog, s.collID, s.partID, s.segID, s.fieldID, s.logID+13)
s.NoError(err)
req := &indexpb.CreateJobRequest{
ClusterID: "cluster1",
IndexFilePrefix: "indexnode-ut/index_files",
BuildID: buildID,
DataPaths: []string{dataPath},
IndexVersion: 1,
StorageConfig: s.storageConfig,
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type", Value: "HNSW",
},
{
Key: "metric_type", Value: "L2",
},
{
Key: "M", Value: "4",
},
{
Key: "efConstruction", Value: "16",
},
},
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim", Value: "8",
},
},
NumRows: 1025,
}
status, err := s.in.CreateJob(ctx, req)
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
for {
resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(resp.GetStatus())
s.NoError(err)
s.Equal(1, len(resp.GetIndexInfos()))
if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished {
break
}
require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress)
time.Sleep(time.Second)
}
status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
})
s.Run("v2.4.x", func() {
buildID := int64(2)
req := &indexpb.CreateJobRequest{
ClusterID: "cluster1",
IndexFilePrefix: "indexnode-ut/index_files",
BuildID: buildID,
DataPaths: nil,
IndexVersion: 1,
StorageConfig: s.storageConfig,
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type", Value: "HNSW",
},
{
Key: "metric_type", Value: "L2",
},
{
Key: "M", Value: "4",
},
{
Key: "efConstruction", Value: "16",
},
},
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim", Value: "8",
},
},
NumRows: 1025,
CurrentIndexVersion: 0,
CollectionID: s.collID,
PartitionID: s.partID,
SegmentID: s.segID,
FieldID: s.fieldID,
FieldName: "floatVector",
FieldType: schemapb.DataType_FloatVector,
Dim: 8,
DataIds: []int64{s.logID + 13},
}
status, err := s.in.CreateJob(ctx, req)
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
for {
resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(resp.GetStatus())
s.NoError(err)
s.Equal(1, len(resp.GetIndexInfos()))
if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished {
break
}
require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress)
time.Sleep(time.Second)
}
status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
})
s.Run("v2.5.x", func() {
buildID := int64(3)
req := &indexpb.CreateJobRequest{
ClusterID: "cluster1",
IndexFilePrefix: "indexnode-ut/index_files",
BuildID: buildID,
IndexVersion: 1,
StorageConfig: s.storageConfig,
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type", Value: "HNSW",
},
{
Key: "metric_type", Value: "L2",
},
{
Key: "M", Value: "4",
},
{
Key: "efConstruction", Value: "16",
},
},
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim", Value: "8",
},
},
NumRows: 1025,
CurrentIndexVersion: 0,
CollectionID: s.collID,
PartitionID: s.partID,
SegmentID: s.segID,
FieldID: s.fieldID,
FieldName: "floatVector",
FieldType: schemapb.DataType_FloatVector,
Dim: 8,
DataIds: []int64{s.logID + 13},
Field: &schemapb.FieldSchema{
FieldID: s.fieldID,
Name: "floatVector",
DataType: schemapb.DataType_FloatVector,
},
}
status, err := s.in.CreateJob(ctx, req)
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
for {
resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(resp.GetStatus())
s.NoError(err)
s.Equal(1, len(resp.GetIndexInfos()))
if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished {
break
}
require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress)
time.Sleep(time.Second)
}
status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
})
})
}
func (s *IndexNodeSuite) Test_CreateIndexJob_ScalarIndex() {
ctx := context.Background()
s.Run("int64 inverted", func() {
buildID := int64(10)
fieldID := int64(13)
dataPath, err := binlog.BuildLogPath(storage.InsertBinlog, s.collID, s.partID, s.segID, s.fieldID, s.logID+13)
s.NoError(err)
req := &indexpb.CreateJobRequest{
ClusterID: "cluster1",
IndexFilePrefix: "indexnode-ut/index_files",
BuildID: buildID,
DataPaths: []string{dataPath},
IndexVersion: 1,
StorageConfig: s.storageConfig,
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type", Value: "INVERTED",
},
},
TypeParams: nil,
NumRows: 1025,
DataIds: []int64{s.logID + 13},
Field: &schemapb.FieldSchema{
FieldID: fieldID,
Name: "int64",
DataType: schemapb.DataType_Int64,
},
}
status, err := s.in.CreateJob(ctx, req)
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
for {
resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(resp.GetStatus())
s.NoError(err)
s.Equal(1, len(resp.GetIndexInfos()))
if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished {
break
}
require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress)
time.Sleep(time.Second)
}
status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{
ClusterID: "cluster1",
BuildIDs: []int64{buildID},
})
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
})
}

View File

@ -19,7 +19,6 @@ package indexnode
import (
"context"
"fmt"
"runtime/debug"
"strconv"
"strings"
"time"
@ -36,8 +35,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -61,16 +58,12 @@ type taskInfo struct {
failReason string
currentIndexVersion int32
indexStoreVersion int64
// task statistics
statistic *indexpb.JobInfo
}
type task interface {
Ctx() context.Context
Name() string
Prepare(context.Context) error
LoadData(context.Context) error
BuildIndex(context.Context) error
SaveIndexFiles(context.Context) error
OnEnqueue(context.Context) error
@ -83,37 +76,47 @@ type indexBuildTaskV2 struct {
*indexBuildTask
}
func (it *indexBuildTaskV2) parseParams(ctx context.Context) error {
it.collectionID = it.req.GetCollectionID()
it.partitionID = it.req.GetPartitionID()
it.segmentID = it.req.GetSegmentID()
it.fieldType = it.req.GetFieldType()
if it.fieldType == schemapb.DataType_None {
it.fieldType = it.req.GetField().GetDataType()
func newIndexBuildTaskV2(ctx context.Context,
cancel context.CancelFunc,
req *indexpb.CreateJobRequest,
node *IndexNode,
) *indexBuildTaskV2 {
t := &indexBuildTaskV2{
indexBuildTask: &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.GetClusterID(), req.GetBuildID()),
cancel: cancel,
ctx: ctx,
req: req,
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.GetBuildID(), req.GetClusterID())),
node: node,
},
}
it.fieldID = it.req.GetFieldID()
if it.fieldID == 0 {
it.fieldID = it.req.GetField().GetFieldID()
t.parseParams()
return t
}
func (it *indexBuildTaskV2) parseParams() {
// fill field for requests before v2.5.0
if it.req.GetField() == nil || it.req.GetField().GetDataType() == schemapb.DataType_None {
it.req.Field = &schemapb.FieldSchema{
FieldID: it.req.GetFieldID(),
Name: it.req.GetFieldName(),
DataType: it.req.GetFieldType(),
}
}
it.fieldName = it.req.GetFieldName()
if it.fieldName == "" {
it.fieldName = it.req.GetField().GetName()
}
return nil
}
func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
err := it.parseParams(ctx)
if err != nil {
log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err))
return err
}
log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()),
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))
indexType := it.newIndexParams[common.IndexTypeKey]
if indexType == indexparamcheck.IndexDISKANN {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
log.Ctx(ctx).Warn("IndexNode don't support build disk index",
log.Warn("IndexNode don't support build disk index",
zap.String("index type", it.newIndexParams[common.IndexTypeKey]),
zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk.GetAsBool()))
return merr.WrapErrIndexNotSupported("disk index")
@ -122,19 +125,19 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.statistic.Dim, it.req.GetNumRows(), it.fieldType)
fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
log.Warn("IndexNode get local used size failed")
return err
}
usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*diskUsageRatio) + localUsedSize
maxUsedLocalSize := int64(Params.IndexNodeCfg.DiskCapacityLimit.GetAsFloat() * Params.IndexNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
if usedLocalSizeWhenBuild > maxUsedLocalSize {
log.Ctx(ctx).Warn("IndexNode don't has enough disk size to build disk ann index",
log.Warn("IndexNode don't has enough disk size to build disk ann index",
zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild),
zap.Int64("maxUsedLocalSize", maxUsedLocalSize))
return merr.WrapErrServiceDiskLimitExceeded(float32(usedLocalSizeWhenBuild), float32(maxUsedLocalSize))
@ -142,7 +145,7 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize))
if err != nil {
log.Ctx(ctx).Warn("failed to fill disk index params", zap.Error(err))
log.Warn("failed to fill disk index params", zap.Error(err))
return err
}
}
@ -174,29 +177,19 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
})
}
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
field := it.req.GetField()
if field == nil || field.GetDataType() == schemapb.DataType_None {
field = &schemapb.FieldSchema{
FieldID: it.fieldID,
Name: it.fieldName,
DataType: it.fieldType,
}
}
buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.ClusterID,
BuildID: it.BuildID,
CollectionID: it.collectionID,
PartitionID: it.partitionID,
SegmentID: it.segmentID,
ClusterID: it.req.GetClusterID(),
BuildID: it.req.GetBuildID(),
CollectionID: it.req.GetCollectionID(),
PartitionID: it.req.GetPartitionID(),
SegmentID: it.req.GetSegmentID(),
IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.currentIndexVersion,
CurrentIndexVersion: it.req.GetCurrentIndexVersion(),
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: field,
FieldSchema: it.req.GetField(),
StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams),
@ -206,33 +199,36 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
OptFields: optFields,
}
var err error
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
zap.Int64("buildID", it.BuildID),
zap.Int64("index version", it.req.GetIndexVersion()))
log.Warn("failed to clean cached data on disk after build index failed")
}
log.Ctx(ctx).Error("failed to build index", zap.Error(err))
log.Warn("failed to build index", zap.Error(err))
return err
}
buildIndexLatency := it.tr.RecordSpan()
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds()))
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
log.Info("Successfully build index")
return nil
}
func (it *indexBuildTaskV2) SaveIndexFiles(ctx context.Context) error {
log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()),
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))
gcIndex := func() {
if err := it.index.Delete(); err != nil {
log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
log.Warn("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
}
}
version, err := it.index.UpLoadV2()
if err != nil {
log.Ctx(ctx).Error("failed to upload index", zap.Error(err))
log.Warn("failed to upload index", zap.Error(err))
gcIndex()
return err
}
@ -244,17 +240,15 @@ func (it *indexBuildTaskV2) SaveIndexFiles(ctx context.Context) error {
gcIndex()
// use serialized size before encoding
it.serializedSize = 0
var serializedSize uint64
saveFileKeys := make([]string, 0)
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatisticV2(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic, it.currentIndexVersion, version)
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys))
it.node.storeIndexFilesAndStatisticV2(it.req.GetClusterID(), it.req.GetBuildID(), saveFileKeys, serializedSize, it.req.GetCurrentIndexVersion(), version)
log.Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys))
saveIndexFileDur := it.tr.RecordSpan()
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(saveIndexFileDur.Seconds())
it.tr.Elapse("index building all done")
log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID))
log.Info("Successfully save index files")
return nil
}
@ -264,29 +258,45 @@ type indexBuildTask struct {
cancel context.CancelFunc
ctx context.Context
cm storage.ChunkManager
index indexcgowrapper.CodecIndex
savePaths []string
req *indexpb.CreateJobRequest
currentIndexVersion int32
BuildID UniqueID
nodeID UniqueID
ClusterID string
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldID UniqueID
fieldName string
fieldType schemapb.DataType
fieldData storage.FieldData
indexBlobs []*storage.Blob
newTypeParams map[string]string
newIndexParams map[string]string
serializedSize uint64
tr *timerecord.TimeRecorder
queueDur time.Duration
statistic indexpb.JobInfo
node *IndexNode
cm storage.ChunkManager
index indexcgowrapper.CodecIndex
req *indexpb.CreateJobRequest
newTypeParams map[string]string
newIndexParams map[string]string
tr *timerecord.TimeRecorder
queueDur time.Duration
node *IndexNode
}
func newIndexBuildTask(ctx context.Context,
cancel context.CancelFunc,
req *indexpb.CreateJobRequest,
cm storage.ChunkManager,
node *IndexNode,
) *indexBuildTask {
t := &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.GetClusterID(), req.GetBuildID()),
cancel: cancel,
ctx: ctx,
cm: cm,
req: req,
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.GetBuildID(), req.GetClusterID())),
node: node,
}
t.parseParams()
return t
}
func (it *indexBuildTask) parseParams() {
// fill field for requests before v2.5.0
if it.req.GetField() == nil || it.req.GetField().GetDataType() == schemapb.DataType_None {
it.req.Field = &schemapb.FieldSchema{
FieldID: it.req.GetFieldID(),
Name: it.req.GetFieldName(),
DataType: it.req.GetFieldType(),
}
}
}
func (it *indexBuildTask) Reset() {
@ -295,10 +305,7 @@ func (it *indexBuildTask) Reset() {
it.ctx = nil
it.cm = nil
it.index = nil
it.savePaths = nil
it.req = nil
it.fieldData = nil
it.indexBlobs = nil
it.newTypeParams = nil
it.newIndexParams = nil
it.tr = nil
@ -316,27 +323,27 @@ func (it *indexBuildTask) Name() string {
}
func (it *indexBuildTask) SetState(state commonpb.IndexState, failReason string) {
it.node.storeTaskState(it.ClusterID, it.BuildID, state, failReason)
it.node.storeTaskState(it.req.GetClusterID(), it.req.GetBuildID(), state, failReason)
}
func (it *indexBuildTask) GetState() commonpb.IndexState {
return it.node.loadTaskState(it.ClusterID, it.BuildID)
return it.node.loadTaskState(it.req.GetClusterID(), it.req.GetBuildID())
}
// OnEnqueue enqueues indexing tasks.
func (it *indexBuildTask) OnEnqueue(ctx context.Context) error {
it.queueDur = 0
it.tr.RecordSpan()
it.statistic.StartTime = time.Now().UnixMicro()
it.statistic.PodID = it.node.GetNodeID()
log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segmentID", it.segmentID))
log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("segmentID", it.req.GetSegmentID()))
return nil
}
func (it *indexBuildTask) Prepare(ctx context.Context) error {
it.queueDur = it.tr.RecordSpan()
log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("Collection", it.req.GetCollectionID()), zap.Int64("SegmentID", it.req.GetSegmentID()))
typeParams := make(map[string]string)
indexParams := make(map[string]string)
@ -377,86 +384,41 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
it.newTypeParams = typeParams
it.newIndexParams = indexParams
it.statistic.IndexParams = it.req.GetIndexParams()
it.statistic.Dim = it.req.GetDim()
if it.req.GetDim() == 0 {
// fill dim for requests before v2.4.0
if dimStr, ok := typeParams[common.DimKey]; ok {
var err error
it.req.Dim, err = strconv.ParseInt(dimStr, 10, 64)
if err != nil {
log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err))
// ignore error
}
}
}
log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
if it.req.GetCollectionID() == 0 {
err := it.parseFieldMetaFromBinlog(ctx)
if err != nil {
log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err))
return err
}
}
log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("collectionID", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()))
return nil
}
func (it *indexBuildTask) LoadData(ctx context.Context) error {
getValueByPath := func(path string) ([]byte, error) {
data, err := it.cm.Read(ctx, path)
if err != nil {
if errors.Is(err, merr.ErrIoKeyNotFound) {
return nil, err
}
return nil, err
}
return data, nil
}
getBlobByPath := func(path string) (*Blob, error) {
value, err := getValueByPath(path)
if err != nil {
return nil, err
}
return &Blob{
Key: path,
Value: value,
}, nil
}
toLoadDataPaths := it.req.GetDataPaths()
keys := make([]string, len(toLoadDataPaths))
blobs := make([]*Blob, len(toLoadDataPaths))
loadKey := func(idx int) error {
keys[idx] = toLoadDataPaths[idx]
blob, err := getBlobByPath(toLoadDataPaths[idx])
if err != nil {
return err
}
blobs[idx] = blob
return nil
}
// Use hardware.GetCPUNum() instead of hardware.GetCPUNum()
// to respect CPU quota of container/pod
// gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value
err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), hardware.GetCPUNum(), loadKey, "loadKey")
if err != nil {
log.Ctx(ctx).Warn("loadKey failed", zap.Error(err))
return err
}
loadFieldDataLatency := it.tr.CtxRecord(ctx, "load field data done")
metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(loadFieldDataLatency.Seconds())
err = it.decodeBlobs(ctx, blobs)
if err != nil {
log.Ctx(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID), zap.Error(err))
} else {
log.Ctx(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
}
blobs = nil
debug.FreeOSMemory()
return err
}
func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
err := it.parseFieldMetaFromBinlog(ctx)
if err != nil {
log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err))
return err
}
log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()),
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))
indexType := it.newIndexParams[common.IndexTypeKey]
if indexType == indexparamcheck.IndexDISKANN {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
log.Ctx(ctx).Warn("IndexNode don't support build disk index",
log.Warn("IndexNode don't support build disk index",
zap.String("index type", it.newIndexParams[common.IndexTypeKey]),
zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk.GetAsBool()))
return errors.New("index node don't support build disk index")
@ -465,19 +427,19 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.statistic.Dim, it.req.GetNumRows(), it.fieldType)
fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
log.Warn("IndexNode get local used size failed")
return err
}
usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*diskUsageRatio) + localUsedSize
maxUsedLocalSize := int64(Params.IndexNodeCfg.DiskCapacityLimit.GetAsFloat() * Params.IndexNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
if usedLocalSizeWhenBuild > maxUsedLocalSize {
log.Ctx(ctx).Warn("IndexNode don't has enough disk size to build disk ann index",
log.Warn("IndexNode don't has enough disk size to build disk ann index",
zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild),
zap.Int64("maxUsedLocalSize", maxUsedLocalSize))
return errors.New("index node don't has enough disk size to build disk ann index")
@ -485,7 +447,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize))
if err != nil {
log.Ctx(ctx).Warn("failed to fill disk index params", zap.Error(err))
log.Warn("failed to fill disk index params", zap.Error(err))
return err
}
}
@ -517,28 +479,19 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
})
}
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
field := it.req.GetField()
if field == nil || field.GetDataType() == schemapb.DataType_None {
field = &schemapb.FieldSchema{
FieldID: it.fieldID,
Name: it.fieldName,
DataType: it.fieldType,
}
}
buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.ClusterID,
BuildID: it.BuildID,
CollectionID: it.collectionID,
PartitionID: it.partitionID,
SegmentID: it.segmentID,
ClusterID: it.req.GetClusterID(),
BuildID: it.req.GetBuildID(),
CollectionID: it.req.GetCollectionID(),
PartitionID: it.req.GetPartitionID(),
SegmentID: it.req.GetSegmentID(),
IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.currentIndexVersion,
CurrentIndexVersion: it.req.GetCurrentIndexVersion(),
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: field,
FieldSchema: it.req.GetField(),
StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams),
@ -548,33 +501,37 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
OptFields: optFields,
}
log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams))
var err error
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
zap.Int64("buildID", it.BuildID),
zap.Int64("index version", it.req.GetIndexVersion()))
log.Warn("failed to clean cached data on disk after build index failed")
}
log.Ctx(ctx).Error("failed to build index", zap.Error(err))
log.Warn("failed to build index", zap.Error(err))
return err
}
buildIndexLatency := it.tr.RecordSpan()
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(buildIndexLatency.Seconds())
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID), zap.Int32("currentIndexVersion", it.currentIndexVersion))
log.Info("Successfully build index")
return nil
}
func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()),
zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()),
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))
gcIndex := func() {
if err := it.index.Delete(); err != nil {
log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
log.Warn("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
}
}
indexFilePath2Size, err := it.index.UpLoad()
if err != nil {
log.Ctx(ctx).Error("failed to upload index", zap.Error(err))
log.Warn("failed to upload index", zap.Error(err))
gcIndex()
return err
}
@ -585,27 +542,26 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
gcIndex()
// use serialized size before encoding
it.serializedSize = 0
var serializedSize uint64
saveFileKeys := make([]string, 0)
for filePath, fileSize := range indexFilePath2Size {
it.serializedSize += uint64(fileSize)
serializedSize += uint64(fileSize)
parts := strings.Split(filePath, "/")
fileKey := parts[len(parts)-1]
saveFileKeys = append(saveFileKeys, fileKey)
}
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic, it.currentIndexVersion)
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys))
it.node.storeIndexFilesAndStatistic(it.req.GetClusterID(), it.req.GetBuildID(), saveFileKeys, serializedSize, it.req.GetCurrentIndexVersion())
log.Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys))
saveIndexFileDur := it.tr.RecordSpan()
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(saveIndexFileDur.Seconds())
it.tr.Elapse("index building all done")
log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID))
log.Info("Successfully save index files")
return nil
}
func (it *indexBuildTask) parseFieldMetaFromBinlog(ctx context.Context) error {
// fill collectionID, partitionID... for requests before v2.4.0
toLoadDataPaths := it.req.GetDataPaths()
if len(toLoadDataPaths) == 0 {
return merr.WrapErrParameterInvalidMsg("data insert path must be not empty")
@ -627,51 +583,17 @@ func (it *indexBuildTask) parseFieldMetaFromBinlog(ctx context.Context) error {
return merr.WrapErrParameterInvalidMsg("we expect only one field in deserialized insert data")
}
it.collectionID = collectionID
it.partitionID = partitionID
it.segmentID = segmentID
for fID, value := range insertData.Data {
it.fieldType = value.GetDataType()
it.fieldID = fID
break
it.req.CollectionID = collectionID
it.req.PartitionID = partitionID
it.req.SegmentID = segmentID
if it.req.GetField().GetFieldID() == 0 {
for fID, value := range insertData.Data {
it.req.Field.DataType = value.GetDataType()
it.req.Field.FieldID = fID
break
}
}
it.req.CurrentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
return nil
}
func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob) error {
var insertCodec storage.InsertCodec
collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(blobs)
if err2 != nil {
return err2
}
metrics.IndexNodeDecodeFieldLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(it.tr.RecordSpan().Seconds())
if len(insertData.Data) != 1 {
return merr.WrapErrParameterInvalidMsg("we expect only one field in deserialized insert data")
}
it.collectionID = collectionID
it.partitionID = partitionID
it.segmentID = segmentID
deserializeDur := it.tr.RecordSpan()
log.Ctx(ctx).Info("IndexNode deserialize data success",
zap.Int64("collectionID", it.collectionID),
zap.Int64("partitionID", it.partitionID),
zap.Int64("segmentID", it.segmentID),
zap.Duration("deserialize duration", deserializeDur))
// we can ensure that there blobs are in one Field
var data storage.FieldData
var fieldID storage.FieldID
for fID, value := range insertData.Data {
data = value
fieldID = fID
break
}
it.statistic.NumRows = int64(data.RowNum())
it.fieldID = fieldID
it.fieldData = data
return nil
}

View File

@ -36,184 +36,8 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
// import (
// "context"
// "github.com/cockroachdb/errors"
// "math/rand"
// "path"
// "strconv"
// "testing"
// "github.com/milvus-io/milvus/internal/kv"
// "github.com/golang/protobuf/proto"
// etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
// "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
// "github.com/milvus-io/milvus/internal/proto/indexpb"
// "github.com/milvus-io/milvus/internal/storage"
// "github.com/milvus-io/milvus/pkg/util/etcd"
// "github.com/milvus-io/milvus/pkg/util/timerecord"
// "github.com/stretchr/testify/assert"
// )
// func TestIndexBuildTask_saveIndexMeta(t *testing.T) {
// Params.Init()
// etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
// assert.NoError(t, err)
// assert.NotNil(t, etcdCli)
// etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
// assert.NotNil(t, etcdKV)
// indexBuildID := rand.Int63()
// indexMeta := &indexpb.IndexMeta{
// IndexBuildID: indexBuildID,
// State: commonpb.IndexState_InProgress,
// NodeID: 1,
// IndexVersion: 1,
// }
// metaPath := path.Join("indexes", strconv.FormatInt(indexMeta.IndexBuildID, 10))
// metaValue, err := proto.Marshal(indexMeta)
// assert.NoError(t, err)
// err = etcdKV.Save(metaPath, string(metaValue))
// assert.NoError(t, err)
// indexBuildTask := &IndexBuildTask{
// BaseTask: BaseTask{
// internalErr: errors.New("internal err"),
// },
// etcdKV: etcdKV,
// req: &indexpb.CreateIndexRequest{
// IndexBuildID: indexBuildID,
// Version: 1,
// MetaPath: metaPath,
// },
// tr: &timerecord.TimeRecorder{},
// }
// err = indexBuildTask.saveIndexMeta(context.Background())
// assert.NoError(t, err)
// indexMeta2, _, err := indexBuildTask.loadIndexMeta(context.Background())
// assert.NoError(t, err)
// assert.NotNil(t, indexMeta2)
// assert.Equal(t, commonpb.IndexState_Unissued, indexMeta2.State)
// err = etcdKV.Remove(metaPath)
// assert.NoError(t, err)
// }
// type mockChunkManager struct {
// storage.ChunkManager
// read func(key string) ([]byte, error)
// }
// func (mcm *mockChunkManager) Read(key string) ([]byte, error) {
// return mcm.read(key)
// }
// func TestIndexBuildTask_Execute(t *testing.T) {
// t.Run("task retry", func(t *testing.T) {
// indexTask := &IndexBuildTask{
// cm: &mockChunkManager{
// read: func(key string) ([]byte, error) {
// return nil, errors.New("error occurred")
// },
// },
// req: &indexpb.CreateIndexRequest{
// IndexBuildID: 1,
// DataPaths: []string{"path1", "path2"},
// },
// }
// err := indexTask.Execute(context.Background())
// assert.Error(t, err)
// assert.Equal(t, TaskStateRetry, indexTask.state)
// })
// t.Run("task failed", func(t *testing.T) {
// indexTask := &IndexBuildTask{
// cm: &mockChunkManager{
// read: func(key string) ([]byte, error) {
// return nil, ErrNoSuchKey
// },
// },
// req: &indexpb.CreateIndexRequest{
// IndexBuildID: 1,
// DataPaths: []string{"path1", "path2"},
// },
// }
// err := indexTask.Execute(context.Background())
// assert.ErrorIs(t, err, ErrNoSuchKey)
// assert.Equal(t, TaskStateFailed, indexTask.state)
// })
// }
// type mockETCDKV struct {
// kv.MetaKv
// loadWithPrefix2 func(key string) ([]string, []string, []int64, error)
// }
// func TestIndexBuildTask_loadIndexMeta(t *testing.T) {
// t.Run("load empty meta", func(t *testing.T) {
// indexTask := &IndexBuildTask{
// etcdKV: &mockETCDKV{
// loadWithPrefix2: func(key string) ([]string, []string, []int64, error) {
// return []string{}, []string{}, []int64{}, nil
// },
// },
// req: &indexpb.CreateIndexRequest{
// IndexBuildID: 1,
// DataPaths: []string{"path1", "path2"},
// },
// }
// indexMeta, revision, err := indexTask.loadIndexMeta(context.Background())
// assert.NoError(t, err)
// assert.Equal(t, int64(0), revision)
// assert.Equal(t, TaskStateAbandon, indexTask.GetState())
// indexTask.updateTaskState(indexMeta, nil)
// assert.Equal(t, TaskStateAbandon, indexTask.GetState())
// })
// }
// func TestIndexBuildTask_saveIndex(t *testing.T) {
// t.Run("save index failed", func(t *testing.T) {
// indexTask := &IndexBuildTask{
// etcdKV: &mockETCDKV{
// loadWithPrefix2: func(key string) ([]string, []string, []int64, error) {
// return []string{}, []string{}, []int64{}, errors.New("error")
// },
// },
// partitionID: 1,
// segmentID: 1,
// req: &indexpb.CreateIndexRequest{
// IndexBuildID: 1,
// DataPaths: []string{"path1", "path2"},
// Version: 1,
// },
// }
// blobs := []*storage.Blob{
// {
// Key: "key1",
// Value: []byte("value1"),
// },
// {
// Key: "key2",
// Value: []byte("value2"),
// },
// }
// err := indexTask.saveIndex(context.Background(), blobs)
// assert.Error(t, err)
// })
// }
type IndexBuildTaskV2Suite struct {
suite.Suite
schema *schemapb.CollectionSchema
@ -283,14 +107,12 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() {
RootPath: "/tmp/milvus/data",
StorageType: "local",
},
CollectionID: 1,
PartitionID: 1,
SegmentID: 1,
Field: &schemapb.FieldSchema{
FieldID: 3,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
},
CollectionID: 1,
PartitionID: 1,
SegmentID: 1,
FieldID: 3,
FieldName: "vec",
FieldType: schemapb.DataType_FloatVector,
StorePath: "file://" + suite.space.Path(),
StoreVersion: suite.space.GetCurrentVersion(),
IndexStorePath: "file://" + suite.space.Path(),
@ -300,17 +122,7 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() {
},
}
task := &indexBuildTaskV2{
indexBuildTask: &indexBuildTask{
ident: "test",
ctx: context.Background(),
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
req: req,
tr: timerecord.NewTimeRecorder("test"),
node: NewIndexNode(context.Background(), dependency.NewDefaultFactory(true)),
},
}
task := newIndexBuildTaskV2(context.Background(), nil, req, NewIndexNode(context.Background(), dependency.NewDefaultFactory(true)))
var err error
err = task.Prepare(context.Background())

View File

@ -4,11 +4,9 @@ import (
"context"
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
)
@ -61,7 +59,6 @@ func (i *IndexNode) storeIndexFilesAndStatistic(
buildID UniqueID,
fileKeys []string,
serializedSize uint64,
statistic *indexpb.JobInfo,
currentIndexVersion int32,
) {
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
@ -70,7 +67,6 @@ func (i *IndexNode) storeIndexFilesAndStatistic(
if info, ok := i.tasks[key]; ok {
info.fileKeys = common.CloneStringList(fileKeys)
info.serializedSize = serializedSize
info.statistic = proto.Clone(statistic).(*indexpb.JobInfo)
info.currentIndexVersion = currentIndexVersion
return
}
@ -81,7 +77,6 @@ func (i *IndexNode) storeIndexFilesAndStatisticV2(
buildID UniqueID,
fileKeys []string,
serializedSize uint64,
statistic *indexpb.JobInfo,
currentIndexVersion int32,
indexStoreVersion int64,
) {
@ -91,7 +86,6 @@ func (i *IndexNode) storeIndexFilesAndStatisticV2(
if info, ok := i.tasks[key]; ok {
info.fileKeys = common.CloneStringList(fileKeys)
info.serializedSize = serializedSize
info.statistic = proto.Clone(statistic).(*indexpb.JobInfo)
info.currentIndexVersion = currentIndexVersion
info.indexStoreVersion = indexStoreVersion
return