Estimate the memory size of the index before building the index (#12973)

Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2021-12-09 14:19:40 +08:00 committed by GitHub
parent cec42cb19a
commit 342200ce13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 540 additions and 122 deletions

View File

@ -21,6 +21,8 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/mock"
"google.golang.org/grpc"
@ -29,7 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
@ -169,7 +170,8 @@ func TestIndexNodeClient(t *testing.T) {
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.Nil(t, err)
resp, err := inc.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)

View File

@ -20,12 +20,13 @@ import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
@ -75,9 +76,8 @@ func TestIndexNodeServer(t *testing.T) {
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.Nil(t, err)
resp, err := server.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)

View File

@ -181,7 +181,7 @@ func TestGrpcService(t *testing.T) {
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)

View File

@ -354,13 +354,6 @@ func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.String
// the task is recorded in Meta. The background process assignTaskLoop will find this task and assign it to IndexNode for
// execution.
func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
log.Debug("IndexCoord building index ...",
zap.Int64("IndexBuildID", req.IndexBuildID),
zap.String("IndexName = ", req.IndexName),
zap.Int64("IndexID = ", req.IndexID),
zap.Strings("DataPath = ", req.DataPaths),
zap.Any("TypeParams", req.TypeParams),
zap.Any("IndexParams", req.IndexParams))
if !i.isHealthy() {
errMsg := "IndexCoord is not healthy"
err := errors.New(errMsg)
@ -372,6 +365,15 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ
},
}, err
}
log.Debug("IndexCoord building index ...",
zap.Int64("IndexBuildID", req.IndexBuildID),
zap.String("IndexName = ", req.IndexName),
zap.Int64("IndexID = ", req.IndexID),
zap.Strings("DataPath = ", req.DataPaths),
zap.Any("TypeParams", req.TypeParams),
zap.Any("IndexParams", req.IndexParams),
zap.Int64("numRow", req.NumRows),
zap.Any("field type", req.FieldSchema.DataType))
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "IndexCoord-BuildIndex")
defer sp.Finish()
hasIndex, indexBuildID := i.metaTable.HasSameReq(req)
@ -837,7 +839,8 @@ func (i *IndexCoord) assignTaskLoop() {
continue
}
log.Debug("The version of the task has been updated", zap.Int64("indexBuildID", indexBuildID))
nodeID, builderClient := i.nodeManager.PeekClient()
nodeID, builderClient := i.nodeManager.PeekClient(meta)
if builderClient == nil {
log.Warn("IndexCoord assignmentTasksLoop can not find available IndexNode")
break

View File

@ -23,6 +23,8 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -97,6 +99,16 @@ func TestIndexCoord(t *testing.T) {
req := &indexpb.BuildIndexRequest{
IndexID: indexID,
DataPaths: []string{"DataPath-1", "DataPath-2"},
NumRows: 0,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
FieldSchema: &schemapb.FieldSchema{
DataType: schemapb.DataType_FloatVector,
},
}
resp, err := ic.BuildIndex(ctx, req)
assert.Nil(t, err)

View File

@ -20,6 +20,8 @@ import (
"strconv"
"testing"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/golang/protobuf/proto"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -37,8 +39,12 @@ func TestMetaTable(t *testing.T) {
IndexName: "test_index",
IndexID: 0,
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
NumRows: 100,
FieldSchema: &schemapb.FieldSchema{
DataType: schemapb.DataType_FloatVector,
},
}
indexMeta1 := &indexpb.IndexMeta{
IndexBuildID: 1,
@ -170,8 +176,12 @@ func TestMetaTable(t *testing.T) {
IndexName: "test_index",
IndexID: 2,
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
NumRows: 100,
FieldSchema: &schemapb.FieldSchema{
DataType: schemapb.DataType_FloatVector,
},
}
err = metaTable.AddIndex(6, req2)
@ -182,8 +192,12 @@ func TestMetaTable(t *testing.T) {
IndexName: "test_index",
IndexID: 3,
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "dim", Value: "256"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
NumRows: 10,
FieldSchema: &schemapb.FieldSchema{
DataType: schemapb.DataType_FloatVector,
},
}
has, err := metaTable.HasSameReq(req3)
@ -246,8 +260,12 @@ func TestMetaTable(t *testing.T) {
IndexName: "test_index",
IndexID: 4,
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
NumRows: 10,
FieldSchema: &schemapb.FieldSchema{
DataType: schemapb.DataType_FloatVector,
},
}
err = metaTable.AddIndex(7, req4)
assert.Nil(t, err)
@ -269,8 +287,12 @@ func TestMetaTable(t *testing.T) {
IndexName: "test_index",
IndexID: 5,
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
NumRows: 10,
FieldSchema: &schemapb.FieldSchema{
DataType: schemapb.DataType_FloatVector,
},
}
err = metaTable.AddIndex(req5.IndexBuildID, req5)

View File

@ -20,6 +20,8 @@ import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
@ -40,12 +42,31 @@ type NodeManager struct {
func NewNodeManager() *NodeManager {
return &NodeManager{
nodeClients: make(map[UniqueID]types.IndexNode),
pq: &PriorityQueue{},
lock: sync.RWMutex{},
pq: &PriorityQueue{
policy: PeekClientV1,
},
lock: sync.RWMutex{},
}
}
func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) {
func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error {
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
log.Error("create metrics request failed", zap.Error(err))
return err
}
metrics, err := client.GetMetrics(context.Background(), req)
if err != nil {
log.Error("get indexnode metrics failed", zap.Error(err))
return err
}
infos := &metricsinfo.IndexNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(metrics.Response, infos)
if err != nil {
log.Error("get indexnode metrics info failed", zap.Error(err))
return err
}
nm.lock.Lock()
defer nm.lock.Unlock()
@ -55,9 +76,11 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) {
key: nodeID,
priority: 0,
weight: 0,
totalMem: infos.HardwareInfos.Memory,
}
nm.nodeClients[nodeID] = client
nm.pq.Push(item)
return nil
}
// RemoveNode removes the unused client of IndexNode.
@ -88,16 +111,27 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error {
log.Error("IndexCoord NodeManager", zap.Any("Add node err", err))
return err
}
nm.setClient(nodeID, nodeClient)
return nil
return nm.setClient(nodeID, nodeClient)
}
// PeekClient peeks the client with the least load.
func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) {
func (nm *NodeManager) PeekClient(meta Meta) (UniqueID, types.IndexNode) {
nm.lock.Lock()
defer nm.lock.Unlock()
nodeID := nm.pq.Peek()
log.Debug("IndexCoord NodeManager PeekClient")
dim, err := getDimension(meta.indexMeta.Req)
if err != nil {
log.Error(err.Error())
return UniqueID(-1), nil
}
indexSize, err := estimateIndexSize(dim, meta.indexMeta.Req.NumRows, meta.indexMeta.Req.FieldSchema.DataType)
if err != nil {
log.Warn(err.Error())
return UniqueID(-1), nil
}
nodeID := nm.pq.Peek(indexSize, meta.indexMeta.Req.IndexParams, meta.indexMeta.Req.TypeParams)
client, ok := nm.nodeClients[nodeID]
if !ok {
log.Error("IndexCoord NodeManager PeekClient", zap.Any("There is no IndexNode client corresponding to NodeID", nodeID))

View File

@ -0,0 +1,37 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import "github.com/milvus-io/milvus/internal/proto/commonpb"
type PeekClientPolicy func(memorySize uint64, indexParams []*commonpb.KeyValuePair,
typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID
func PeekClientV0(memorySize uint64, indexParams []*commonpb.KeyValuePair,
typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID {
return pq.items[0].key
}
func PeekClientV1(memorySize uint64, indexParams []*commonpb.KeyValuePair,
typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID {
for i := range pq.items {
if pq.items[i].totalMem > memorySize {
return pq.items[i].key
}
}
return UniqueID(-1)
}

View File

@ -0,0 +1,39 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/stretchr/testify/assert"
)
func TestPeekClientV0(t *testing.T) {
pq := newPriorityQueue()
key := PeekClientV0(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}, pq)
assert.Equal(t, UniqueID(0), key)
}
func TestPeekClientV1(t *testing.T) {
pq := newPriorityQueue()
key := PeekClientV1(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}, pq)
assert.Equal(t, UniqueID(0), key)
key2 := PeekClientV1(10000, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{}, pq)
assert.Equal(t, UniqueID(-1), key2)
}

View File

@ -19,6 +19,8 @@ package indexcoord
import (
"container/heap"
"sync"
"github.com/milvus-io/milvus/internal/proto/commonpb"
)
// PQItem is something we manage in a priority queue.
@ -30,12 +32,15 @@ type PQItem struct {
weight int // The weight of the item in the queue.
// When the priority is the same, a smaller weight is more preferred.
index int // The index of the item in the heap.
totalMem uint64 // The total memory of the IndexNode.
}
// PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue struct {
items []*PQItem
lock sync.RWMutex
items []*PQItem
lock sync.RWMutex
policy PeekClientPolicy
}
// Len is the length of the priority queue.
@ -139,14 +144,14 @@ func (pq *PriorityQueue) Remove(key UniqueID) {
}
// Peek picks an key with the lowest load.
func (pq *PriorityQueue) Peek() UniqueID {
func (pq *PriorityQueue) Peek(memorySize uint64, indexParams []*commonpb.KeyValuePair, typeParams []*commonpb.KeyValuePair) UniqueID {
pq.lock.RLock()
defer pq.lock.RUnlock()
if pq.Len() == 0 {
return UniqueID(-1)
}
return pq.items[0].key
return pq.policy(memorySize, indexParams, typeParams, pq)
}
// PeekAll return the key of all the items.

View File

@ -20,18 +20,23 @@ import (
"container/heap"
"testing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/stretchr/testify/assert"
)
const QueueLen = 10
func newPriorityQueue() *PriorityQueue {
ret := &PriorityQueue{}
ret := &PriorityQueue{
policy: PeekClientV0,
}
for i := 0; i < QueueLen; i++ {
item := &PQItem{
key: UniqueID(i),
priority: i,
index: i,
totalMem: 1000,
}
ret.items = append(ret.items, item)
}
@ -76,7 +81,7 @@ func TestPriorityQueue_UpdatePriority(t *testing.T) {
pq := newPriorityQueue()
key := UniqueID(pq.Len() / 2)
pq.UpdatePriority(key, -pq.Len())
peekKey := pq.Peek()
peekKey := pq.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
assert.Equal(t, key, peekKey)
}
@ -84,37 +89,40 @@ func TestPriorityQueue_IncPriority(t *testing.T) {
pq := newPriorityQueue()
key := UniqueID(pq.Len() / 2)
pq.IncPriority(key, -pq.Len())
peekKey := pq.Peek()
peekKey := pq.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
assert.Equal(t, key, peekKey)
}
func TestPriorityQueue(t *testing.T) {
ret := &PriorityQueue{}
ret := &PriorityQueue{
policy: PeekClientV0,
}
for i := 0; i < 4; i++ {
item := &PQItem{
key: UniqueID(i),
priority: 0,
index: i,
totalMem: 1000,
}
ret.items = append(ret.items, item)
}
heap.Init(ret)
peeKey1 := ret.Peek()
peeKey1 := ret.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
assert.Equal(t, int64(0), peeKey1)
ret.IncPriority(peeKey1, 1)
peeKey2 := ret.Peek()
peeKey2 := ret.Peek(100, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
assert.Equal(t, int64(1), peeKey2)
ret.IncPriority(peeKey2, 1)
ret.IncPriority(peeKey1, -1)
ret.IncPriority(peeKey2, -1)
peeKey1 = ret.Peek()
peeKey1 = ret.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
assert.Equal(t, int64(3), peeKey1)
ret.IncPriority(peeKey1, 1)
peeKey2 = ret.Peek()
peeKey2 = ret.Peek(10, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
assert.Equal(t, int64(2), peeKey2)
}

View File

@ -0,0 +1,58 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"errors"
"strconv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
func getDimension(req *indexpb.BuildIndexRequest) (int64, error) {
for _, kvPair := range req.GetTypeParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
if key == "dim" {
dim, err := strconv.ParseInt(value, 10, 64)
if err != nil {
errMsg := "dimension is invalid"
log.Error(errMsg)
return 0, errors.New(errMsg)
}
return dim, nil
}
}
errMsg := "dimension is not in type params"
log.Error(errMsg)
return 0, errors.New(errMsg)
}
func estimateIndexSize(dim int64, numRows int64, dataType schemapb.DataType) (uint64, error) {
if dataType == schemapb.DataType_FloatVector {
return uint64(dim) * uint64(numRows) * 4, nil
}
if dataType == schemapb.DataType_BinaryVector {
return uint64(dim) / 8 * uint64(numRows), nil
}
errMsg := "the field to build index must be a vector field"
log.Error(errMsg)
return 0, errors.New(errMsg)
}

View File

@ -0,0 +1,84 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)
func Test_getDimension(t *testing.T) {
req := &indexpb.BuildIndexRequest{
IndexBuildID: UniqueID(0),
IndexID: UniqueID(1),
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
}
dim, err := getDimension(req)
assert.Equal(t, int64(128), dim)
assert.Nil(t, err)
req2 := &indexpb.BuildIndexRequest{
IndexBuildID: UniqueID(0),
IndexID: UniqueID(1),
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "one",
},
},
}
dim, err = getDimension(req2)
assert.Error(t, err)
assert.Equal(t, int64(0), dim)
req3 := &indexpb.BuildIndexRequest{
IndexBuildID: UniqueID(0),
IndexID: UniqueID(1),
TypeParams: []*commonpb.KeyValuePair{
{
Key: "TypeParam-Key-1",
Value: "TypeParam-Value-1",
},
},
}
dim, err = getDimension(req3)
assert.Error(t, err)
assert.Equal(t, int64(0), dim)
}
func Test_estimateIndexSize(t *testing.T) {
memorySize, err := estimateIndexSize(10, 100, schemapb.DataType_FloatVector)
assert.Nil(t, err)
assert.Equal(t, uint64(4000), memorySize)
memorySize, err = estimateIndexSize(16, 100, schemapb.DataType_BinaryVector)
assert.Nil(t, err)
assert.Equal(t, uint64(200), memorySize)
memorySize, err = estimateIndexSize(10, 100, schemapb.DataType_Float)
assert.Error(t, err)
assert.Equal(t, uint64(0), memorySize)
}

View File

@ -269,10 +269,11 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde
ctx: ctx2,
done: make(chan error),
},
req: request,
kv: i.kv,
etcdKV: i.etcdKV,
nodeID: Params.NodeID,
req: request,
kv: i.kv,
etcdKV: i.etcdKV,
nodeID: Params.NodeID,
serializedSize: 0,
}
ret := &commonpb.Status{

View File

@ -289,12 +289,74 @@ func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}, nil
}
metricType, _ := metricsinfo.ParseMetricType(req.Request)
if metricType == metricsinfo.SystemInfoMetrics {
metrics, err := getMockSystemInfoMetrics(ctx, req, inm)
log.Debug("IndexNode.GetMetrics",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
return metrics, nil
}
log.Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, nil
}
func getMockSystemInfoMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
node *Mock,
) (*milvuspb.GetMetricsResponse, error) {
// TODO(dragondriver): add more metrics
nodeInfos := metricsinfo.IndexNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID),
HardwareInfos: metricsinfo.HardwareMetrics{
CPUCoreCount: metricsinfo.GetCPUCoreCount(false),
CPUCoreUsage: metricsinfo.GetCPUUsage(),
Memory: metricsinfo.GetMemoryCount(),
MemoryUsage: metricsinfo.GetUsedMemoryCount(),
Disk: metricsinfo.GetDiskCount(),
DiskUsage: metricsinfo.GetDiskUsage(),
},
SystemInfo: metricsinfo.DeployMetrics{},
CreatedTime: Params.CreatedTime.String(),
UpdatedTime: Params.UpdatedTime.String(),
Type: typeutil.IndexNodeRole,
},
SystemConfigurations: metricsinfo.IndexNodeConfiguration{
MinioBucketName: Params.MinioBucketName,
SimdType: Params.SimdType,
},
}
metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo)
resp, _ := metricsinfo.MarshalComponentInfos(nodeInfos)
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: "",
ComponentName: "IndexNode",
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID),
}, nil
}

View File

@ -21,6 +21,8 @@ import (
"strconv"
"testing"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -73,12 +75,17 @@ func TestIndexNodeMock(t *testing.T) {
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.Nil(t, err)
resp, err := inm.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
req2, err := metricsinfo.ConstructRequestByMetricType("IndexNode")
assert.Nil(t, err)
resp2, err := inm.GetMetrics(ctx, req2)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp2.Status.ErrorCode)
})
err = inm.Stop()

View File

@ -102,12 +102,13 @@ func (bt *BaseTask) Notify(err error) {
// IndexBuildTask is used to record the information of the index tasks.
type IndexBuildTask struct {
BaseTask
index Index
kv kv.BaseKV
etcdKV *etcdkv.EtcdKV
savePaths []string
req *indexpb.CreateIndexRequest
nodeID UniqueID
index Index
kv kv.BaseKV
etcdKV *etcdkv.EtcdKV
savePaths []string
req *indexpb.CreateIndexRequest
nodeID UniqueID
serializedSize uint64
}
// Ctx is the context of index tasks.
@ -182,6 +183,7 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
}
indexMeta.IndexFilePaths = it.savePaths
indexMeta.State = commonpb.IndexState_Finished
indexMeta.SerializeSize = it.serializedSize
// Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added.
if it.err != nil {
log.Error("IndexNode CreateIndex failed and can not be retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.err))
@ -407,6 +409,11 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
return err
}
tr.Record("serialize index codec done")
it.serializedSize = 0
for i := range serializedIndexBlobs {
it.serializedSize += uint64(len(serializedIndexBlobs[i].Value))
}
log.Debug("serialize index codec done", zap.Uint64("serialized index size", it.serializedSize))
getSavePathByKey := func(key string) string {

View File

@ -7,6 +7,7 @@ option go_package = "github.com/milvus-io/milvus/internal/proto/indexpb";
import "common.proto";
import "internal.proto";
import "milvus.proto";
import "schema.proto";
service IndexCoord {
rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {}
@ -77,6 +78,8 @@ message BuildIndexRequest {
repeated string data_paths = 5;
repeated common.KeyValuePair type_params = 6;
repeated common.KeyValuePair index_params = 7;
int64 num_rows = 8;
schema.FieldSchema field_schema = 9;
}
message BuildIndexResponse {
@ -109,6 +112,7 @@ message IndexMeta {
int64 nodeID = 7;
int64 version = 8;
bool recycled = 9;
uint64 serialize_size = 10;
}
message DropIndexRequest {

View File

@ -10,6 +10,7 @@ import (
commonpb "github.com/milvus-io/milvus/internal/proto/commonpb"
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
milvuspb "github.com/milvus-io/milvus/internal/proto/milvuspb"
schemapb "github.com/milvus-io/milvus/internal/proto/schemapb"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@ -388,6 +389,8 @@ type BuildIndexRequest struct {
DataPaths []string `protobuf:"bytes,5,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"`
TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"`
IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,7,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"`
NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
FieldSchema *schemapb.FieldSchema `protobuf:"bytes,9,opt,name=field_schema,json=fieldSchema,proto3" json:"field_schema,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -460,6 +463,20 @@ func (m *BuildIndexRequest) GetIndexParams() []*commonpb.KeyValuePair {
return nil
}
func (m *BuildIndexRequest) GetNumRows() int64 {
if m != nil {
return m.NumRows
}
return 0
}
func (m *BuildIndexRequest) GetFieldSchema() *schemapb.FieldSchema {
if m != nil {
return m.FieldSchema
}
return nil
}
type BuildIndexResponse struct {
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
IndexBuildID int64 `protobuf:"varint,2,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"`
@ -658,6 +675,7 @@ type IndexMeta struct {
NodeID int64 `protobuf:"varint,7,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
Version int64 `protobuf:"varint,8,opt,name=version,proto3" json:"version,omitempty"`
Recycled bool `protobuf:"varint,9,opt,name=recycled,proto3" json:"recycled,omitempty"`
SerializeSize uint64 `protobuf:"varint,10,opt,name=serialize_size,json=serializeSize,proto3" json:"serialize_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -751,6 +769,13 @@ func (m *IndexMeta) GetRecycled() bool {
return false
}
func (m *IndexMeta) GetSerializeSize() uint64 {
if m != nil {
return m.SerializeSize
}
return 0
}
type DropIndexRequest struct {
IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -809,68 +834,74 @@ func init() {
func init() { proto.RegisterFile("index_coord.proto", fileDescriptor_f9e019eb3fda53c2) }
var fileDescriptor_f9e019eb3fda53c2 = []byte{
// 975 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0x5f, 0x6f, 0x1b, 0x45,
0x10, 0xcf, 0xf9, 0x12, 0x3b, 0x1e, 0x87, 0xa8, 0x59, 0x4a, 0x75, 0xb8, 0x54, 0x75, 0x8e, 0x52,
0x0c, 0x6a, 0x9d, 0xca, 0xa5, 0xf0, 0x84, 0x04, 0x89, 0x45, 0x64, 0xa1, 0x54, 0xd1, 0x36, 0xe2,
0x01, 0x09, 0x59, 0x1b, 0xdf, 0x24, 0x59, 0xf5, 0xfe, 0xe5, 0x76, 0x5d, 0x91, 0x77, 0xde, 0x79,
0x2b, 0xe2, 0x93, 0xf0, 0x39, 0xfa, 0xcc, 0x97, 0x41, 0xbb, 0xb7, 0x77, 0xb9, 0x3b, 0x9f, 0x53,
0x87, 0x50, 0x78, 0xe1, 0xed, 0x66, 0xf6, 0x37, 0x33, 0x3b, 0xbf, 0x9d, 0xfd, 0xdd, 0xc2, 0x16,
0x0f, 0x3d, 0xfc, 0x79, 0x32, 0x8d, 0xa2, 0xc4, 0x1b, 0xc4, 0x49, 0x24, 0x23, 0x42, 0x02, 0xee,
0xbf, 0x9a, 0x89, 0xd4, 0x1a, 0xe8, 0xf5, 0xee, 0xc6, 0x34, 0x0a, 0x82, 0x28, 0x4c, 0x7d, 0xdd,
0x4d, 0x1e, 0x4a, 0x4c, 0x42, 0xe6, 0x1b, 0x7b, 0xa3, 0x18, 0xe1, 0xfe, 0x66, 0xc1, 0xfb, 0x14,
0x4f, 0xb9, 0x90, 0x98, 0x3c, 0x8f, 0x3c, 0xa4, 0x78, 0x3e, 0x43, 0x21, 0xc9, 0x13, 0x58, 0x3d,
0x66, 0x02, 0x1d, 0xab, 0x67, 0xf5, 0x3b, 0xc3, 0x8f, 0x06, 0xa5, 0x32, 0x26, 0xff, 0x81, 0x38,
0xdd, 0x65, 0x02, 0xa9, 0x46, 0x92, 0x2f, 0xa1, 0xc5, 0x3c, 0x2f, 0x41, 0x21, 0x9c, 0xc6, 0x15,
0x41, 0xdf, 0xa6, 0x18, 0x9a, 0x81, 0xc9, 0x1d, 0x68, 0x86, 0x91, 0x87, 0xe3, 0x91, 0x63, 0xf7,
0xac, 0xbe, 0x4d, 0x8d, 0xe5, 0xfe, 0x6a, 0xc1, 0xed, 0xf2, 0xce, 0x44, 0x1c, 0x85, 0x02, 0xc9,
0x53, 0x68, 0x0a, 0xc9, 0xe4, 0x4c, 0x98, 0xcd, 0xdd, 0xad, 0xad, 0xf3, 0x42, 0x43, 0xa8, 0x81,
0x92, 0x5d, 0xe8, 0xf0, 0x90, 0xcb, 0x49, 0xcc, 0x12, 0x16, 0x64, 0x3b, 0xdc, 0x1e, 0x54, 0xd8,
0x33, 0x44, 0x8d, 0x43, 0x2e, 0x0f, 0x35, 0x90, 0x02, 0xcf, 0xbf, 0xdd, 0xaf, 0xe1, 0x83, 0x7d,
0x94, 0x63, 0xc5, 0xb1, 0xca, 0x8e, 0x22, 0x23, 0xeb, 0x01, 0xbc, 0xa7, 0x99, 0xdf, 0x9d, 0x71,
0xdf, 0x1b, 0x8f, 0xd4, 0xc6, 0xec, 0xbe, 0x4d, 0xcb, 0x4e, 0xf7, 0x0f, 0x0b, 0xda, 0x3a, 0x78,
0x1c, 0x9e, 0x44, 0xe4, 0x19, 0xac, 0xa9, 0xad, 0xa5, 0x0c, 0x6f, 0x0e, 0xef, 0xd7, 0x36, 0x71,
0x59, 0x8b, 0xa6, 0x68, 0xe2, 0xc2, 0x46, 0x31, 0xab, 0x6e, 0xc4, 0xa6, 0x25, 0x1f, 0x71, 0xa0,
0xa5, 0xed, 0x9c, 0xd2, 0xcc, 0x24, 0xf7, 0x00, 0xd2, 0x11, 0x0a, 0x59, 0x80, 0xce, 0x6a, 0xcf,
0xea, 0xb7, 0x69, 0x5b, 0x7b, 0x9e, 0xb3, 0x00, 0xd5, 0x51, 0x24, 0xc8, 0x44, 0x14, 0x3a, 0x6b,
0x7a, 0xc9, 0x58, 0xee, 0x2f, 0x16, 0xdc, 0xa9, 0x76, 0x7e, 0x93, 0xc3, 0x78, 0x96, 0x06, 0xa1,
0x3a, 0x07, 0xbb, 0xdf, 0x19, 0xde, 0x1b, 0xcc, 0x4f, 0xf1, 0x20, 0xa7, 0x8a, 0x1a, 0xb0, 0xfb,
0xa6, 0x01, 0x64, 0x2f, 0x41, 0x26, 0x51, 0xaf, 0x65, 0xec, 0x57, 0x29, 0xb1, 0x6a, 0x28, 0x29,
0x37, 0xde, 0xa8, 0x36, 0xbe, 0x98, 0x31, 0x07, 0x5a, 0xaf, 0x30, 0x11, 0x3c, 0x0a, 0x35, 0x5d,
0x36, 0xcd, 0x4c, 0x72, 0x17, 0xda, 0x01, 0x4a, 0x36, 0x89, 0x99, 0x3c, 0x33, 0x7c, 0xad, 0x2b,
0xc7, 0x21, 0x93, 0x67, 0xaa, 0x9e, 0xc7, 0xcc, 0xa2, 0x70, 0x9a, 0x3d, 0x5b, 0xd5, 0x53, 0x1e,
0xb5, 0xaa, 0xa7, 0x51, 0x5e, 0xc4, 0x98, 0x4d, 0x63, 0x4b, 0xb3, 0xb0, 0x5d, 0x4b, 0xdd, 0xf7,
0x78, 0xf1, 0x03, 0xf3, 0x67, 0x78, 0xc8, 0x78, 0x42, 0x41, 0x45, 0xa5, 0xd3, 0x48, 0x46, 0xa6,
0xed, 0x2c, 0xc9, 0xfa, 0xb2, 0x49, 0x3a, 0x3a, 0xcc, 0xcc, 0xf4, 0xef, 0x0d, 0xd8, 0x4a, 0x49,
0xfa, 0xd7, 0x28, 0x2d, 0x73, 0xb3, 0xf6, 0x16, 0x6e, 0x9a, 0xff, 0x04, 0x37, 0xad, 0xbf, 0xc5,
0x4d, 0x00, 0xa4, 0x48, 0xcd, 0x4d, 0x26, 0x7e, 0x89, 0x6b, 0xeb, 0x7e, 0x03, 0x4e, 0x76, 0xc9,
0xbe, 0xe3, 0x3e, 0x6a, 0x36, 0xae, 0xa7, 0x30, 0xaf, 0x2d, 0xd8, 0x2a, 0xc5, 0x6b, 0xa5, 0x79,
0x57, 0x1b, 0x26, 0x7d, 0xb8, 0x95, 0xb2, 0x7c, 0xc2, 0x7d, 0x34, 0xc7, 0x69, 0xeb, 0xe3, 0xdc,
0xe4, 0xa5, 0x2e, 0xd4, 0xc6, 0x3e, 0xac, 0xe9, 0xed, 0x26, 0x8c, 0x8e, 0x00, 0x0a, 0x65, 0x53,
0x1d, 0xf9, 0x64, 0xa1, 0x8e, 0x14, 0x09, 0xa1, 0xed, 0x93, 0x7c, 0x63, 0x7f, 0x36, 0x8c, 0x26,
0x1f, 0xa0, 0x64, 0x4b, 0x8d, 0x7d, 0xae, 0xdb, 0x8d, 0x6b, 0xe9, 0xf6, 0x7d, 0xe8, 0x9c, 0x30,
0xee, 0x4f, 0x8c, 0xbe, 0xda, 0xfa, 0xba, 0x80, 0x72, 0x51, 0xed, 0x21, 0x5f, 0x81, 0x9d, 0xe0,
0xb9, 0x16, 0x99, 0x05, 0x8d, 0xcc, 0x5d, 0x53, 0xaa, 0x22, 0x6a, 0x4f, 0x61, 0xad, 0xee, 0x14,
0xc8, 0x36, 0x6c, 0x04, 0x2c, 0x79, 0x39, 0xf1, 0xd0, 0x47, 0x89, 0x9e, 0xd3, 0xec, 0x59, 0xfd,
0x75, 0xda, 0x51, 0xbe, 0x51, 0xea, 0x2a, 0xfc, 0x8c, 0x5b, 0xc5, 0x9f, 0x71, 0x51, 0x06, 0xd7,
0xcb, 0x32, 0xd8, 0x85, 0xf5, 0x04, 0xa7, 0x17, 0x53, 0x1f, 0x3d, 0xa7, 0xad, 0x13, 0xe6, 0xb6,
0xfb, 0x08, 0x6e, 0x8d, 0x92, 0x28, 0x2e, 0x49, 0x4b, 0x41, 0x17, 0xac, 0x92, 0x2e, 0x0c, 0xdf,
0x34, 0x01, 0x34, 0x74, 0x4f, 0xbd, 0x6f, 0x48, 0x0c, 0x64, 0x1f, 0xe5, 0x5e, 0x14, 0xc4, 0x51,
0x88, 0xa1, 0x4c, 0xff, 0x3b, 0xe4, 0xc9, 0x82, 0x5f, 0xf6, 0x3c, 0xd4, 0x14, 0xec, 0x3e, 0x5c,
0x10, 0x51, 0x81, 0xbb, 0x2b, 0x24, 0xd0, 0x15, 0x8f, 0x78, 0x80, 0x47, 0x7c, 0xfa, 0x72, 0xef,
0x8c, 0x85, 0x21, 0xfa, 0x57, 0x55, 0xac, 0x40, 0xb3, 0x8a, 0x1f, 0x97, 0x23, 0x8c, 0xf1, 0x42,
0x26, 0x3c, 0x3c, 0xcd, 0x86, 0xde, 0x5d, 0x21, 0xe7, 0x70, 0x7b, 0x1f, 0x75, 0x75, 0x2e, 0x24,
0x9f, 0x8a, 0xac, 0xe0, 0x70, 0x71, 0xc1, 0x39, 0xf0, 0x35, 0x4b, 0xfe, 0x04, 0x70, 0x39, 0x45,
0x64, 0xb9, 0x29, 0x9b, 0x27, 0xb0, 0x0a, 0xcb, 0xd3, 0x73, 0xd8, 0x2c, 0x3f, 0x13, 0xc8, 0x67,
0x75, 0xb1, 0xb5, 0x8f, 0xa8, 0xee, 0xe7, 0xcb, 0x40, 0xf3, 0x52, 0x09, 0x6c, 0xcd, 0x09, 0x0a,
0x79, 0x74, 0x55, 0x8a, 0xaa, 0xa6, 0x76, 0x1f, 0x2f, 0x89, 0xce, 0x6b, 0x1e, 0x42, 0x3b, 0x1f,
0x67, 0xf2, 0xa0, 0x2e, 0xba, 0x3a, 0xed, 0xdd, 0xab, 0xa4, 0xcc, 0x5d, 0x21, 0x13, 0x80, 0x7d,
0x94, 0x07, 0x28, 0x13, 0x3e, 0x15, 0xe4, 0x61, 0xed, 0x21, 0x5e, 0x02, 0xb2, 0xa4, 0x9f, 0xbe,
0x15, 0x97, 0x6d, 0x79, 0xf8, 0x7a, 0xd5, 0xe8, 0x9b, 0x7a, 0x41, 0xff, 0x7f, 0xa5, 0xde, 0xc1,
0x95, 0x3a, 0x82, 0x4e, 0xe1, 0x4d, 0x4a, 0x6a, 0x2f, 0xcb, 0xfc, 0xa3, 0xf5, 0xbf, 0x1e, 0x8c,
0xdd, 0x2f, 0x7e, 0x1c, 0x9e, 0x72, 0x79, 0x36, 0x3b, 0x56, 0xa5, 0x77, 0x52, 0xe4, 0x63, 0x1e,
0x99, 0xaf, 0x9d, 0x8c, 0xa1, 0x1d, 0x9d, 0x69, 0x47, 0xb7, 0x11, 0x1f, 0x1f, 0x37, 0xb5, 0xf9,
0xf4, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x75, 0x9d, 0x20, 0xf1, 0x89, 0x0e, 0x00, 0x00,
// 1057 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xcd, 0x6e, 0xdb, 0x46,
0x10, 0x36, 0x4d, 0x5b, 0x3f, 0x23, 0xc5, 0x88, 0xb7, 0x69, 0xc0, 0x28, 0x0d, 0x22, 0xb3, 0x49,
0xaa, 0x16, 0x89, 0x1c, 0x28, 0x4d, 0x7b, 0x2a, 0xd0, 0xda, 0x42, 0x0c, 0xa3, 0x70, 0x60, 0xac,
0x8d, 0x1e, 0x0a, 0x14, 0xc2, 0x5a, 0x1c, 0xd9, 0x8b, 0xf0, 0x47, 0xe6, 0xae, 0x92, 0xda, 0xc7,
0xa2, 0xf7, 0xde, 0xd2, 0x47, 0xe9, 0x73, 0xe4, 0x71, 0x7a, 0x2b, 0x76, 0xb9, 0xa4, 0x49, 0x89,
0x72, 0xe4, 0xba, 0x69, 0x2f, 0xbd, 0x71, 0x86, 0xdf, 0xec, 0xec, 0x7c, 0x3b, 0xf3, 0xed, 0xc2,
0x3a, 0x0f, 0x3d, 0xfc, 0x79, 0x30, 0x8c, 0xa2, 0xd8, 0xeb, 0x8e, 0xe3, 0x48, 0x46, 0x84, 0x04,
0xdc, 0x7f, 0x3d, 0x11, 0x89, 0xd5, 0xd5, 0xff, 0x5b, 0xcd, 0x61, 0x14, 0x04, 0x51, 0x98, 0xf8,
0x5a, 0x6b, 0x3c, 0x94, 0x18, 0x87, 0xcc, 0x37, 0x76, 0x33, 0x1f, 0xd1, 0x6a, 0x8a, 0xe1, 0x09,
0x06, 0x2c, 0xb1, 0xdc, 0xdf, 0x2d, 0xf8, 0x88, 0xe2, 0x31, 0x17, 0x12, 0xe3, 0x97, 0x91, 0x87,
0x14, 0x4f, 0x27, 0x28, 0x24, 0x79, 0x0a, 0x2b, 0x47, 0x4c, 0xa0, 0x63, 0xb5, 0xad, 0x4e, 0xa3,
0xf7, 0x49, 0xb7, 0x90, 0xd4, 0x64, 0xdb, 0x13, 0xc7, 0x5b, 0x4c, 0x20, 0xd5, 0x48, 0xf2, 0x15,
0x54, 0x99, 0xe7, 0xc5, 0x28, 0x84, 0xb3, 0x7c, 0x49, 0xd0, 0x77, 0x09, 0x86, 0xa6, 0x60, 0x72,
0x1b, 0x2a, 0x61, 0xe4, 0xe1, 0x6e, 0xdf, 0xb1, 0xdb, 0x56, 0xc7, 0xa6, 0xc6, 0x72, 0x7f, 0xb3,
0xe0, 0x56, 0x71, 0x67, 0x62, 0x1c, 0x85, 0x02, 0xc9, 0x33, 0xa8, 0x08, 0xc9, 0xe4, 0x44, 0x98,
0xcd, 0xdd, 0x2d, 0xcd, 0x73, 0xa0, 0x21, 0xd4, 0x40, 0xc9, 0x16, 0x34, 0x78, 0xc8, 0xe5, 0x60,
0xcc, 0x62, 0x16, 0xa4, 0x3b, 0xdc, 0xe8, 0x4e, 0x71, 0x69, 0x68, 0xdb, 0x0d, 0xb9, 0xdc, 0xd7,
0x40, 0x0a, 0x3c, 0xfb, 0x76, 0xbf, 0x81, 0x8f, 0x77, 0x50, 0xee, 0x2a, 0xc6, 0xd5, 0xea, 0x28,
0x52, 0xb2, 0x1e, 0xc0, 0x0d, 0x7d, 0x0e, 0x5b, 0x13, 0xee, 0x7b, 0xbb, 0x7d, 0xb5, 0x31, 0xbb,
0x63, 0xd3, 0xa2, 0xd3, 0xfd, 0xc3, 0x82, 0xba, 0x0e, 0xde, 0x0d, 0x47, 0x11, 0x79, 0x0e, 0xab,
0x6a, 0x6b, 0x09, 0xc3, 0x6b, 0xbd, 0xfb, 0xa5, 0x45, 0x5c, 0xe4, 0xa2, 0x09, 0x9a, 0xb8, 0xd0,
0xcc, 0xaf, 0xaa, 0x0b, 0xb1, 0x69, 0xc1, 0x47, 0x1c, 0xa8, 0x6a, 0x3b, 0xa3, 0x34, 0x35, 0xc9,
0x3d, 0x80, 0xa4, 0xa1, 0x42, 0x16, 0xa0, 0xb3, 0xd2, 0xb6, 0x3a, 0x75, 0x5a, 0xd7, 0x9e, 0x97,
0x2c, 0x40, 0x75, 0x14, 0x31, 0x32, 0x11, 0x85, 0xce, 0xaa, 0xfe, 0x65, 0x2c, 0xf7, 0x57, 0x0b,
0x6e, 0x4f, 0x57, 0x7e, 0x9d, 0xc3, 0x78, 0x9e, 0x04, 0xa1, 0x3a, 0x07, 0xbb, 0xd3, 0xe8, 0xdd,
0xeb, 0xce, 0xf6, 0x74, 0x37, 0xa3, 0x8a, 0x1a, 0xb0, 0xfb, 0x6e, 0x19, 0xc8, 0x76, 0x8c, 0x4c,
0xa2, 0xfe, 0x97, 0xb2, 0x3f, 0x4d, 0x89, 0x55, 0x42, 0x49, 0xb1, 0xf0, 0xe5, 0xe9, 0xc2, 0xe7,
0x33, 0xe6, 0x40, 0xf5, 0x35, 0xc6, 0x82, 0x47, 0xa1, 0xa6, 0xcb, 0xa6, 0xa9, 0x49, 0xee, 0x42,
0x3d, 0x40, 0xc9, 0x06, 0x63, 0x26, 0x4f, 0x0c, 0x5f, 0x35, 0xe5, 0xd8, 0x67, 0xf2, 0x44, 0xe5,
0xf3, 0x98, 0xf9, 0x29, 0x9c, 0x4a, 0xdb, 0x56, 0xf9, 0x94, 0x47, 0xfd, 0xd5, 0xdd, 0x28, 0xcf,
0xc6, 0x98, 0x76, 0x63, 0x55, 0xb3, 0xb0, 0x51, 0x4a, 0xdd, 0xf7, 0x78, 0xf6, 0x03, 0xf3, 0x27,
0xb8, 0xcf, 0x78, 0x4c, 0x41, 0x45, 0x25, 0xdd, 0x48, 0xfa, 0xa6, 0xec, 0x74, 0x91, 0xda, 0xa2,
0x8b, 0x34, 0x74, 0x98, 0xe9, 0xe9, 0x3f, 0x97, 0x61, 0x3d, 0x21, 0xe9, 0x5f, 0xa3, 0xb4, 0xc8,
0xcd, 0xea, 0x7b, 0xb8, 0xa9, 0xfc, 0x13, 0xdc, 0x54, 0xff, 0x0e, 0x37, 0xe4, 0x0e, 0xd4, 0xc2,
0x49, 0x30, 0x88, 0xa3, 0x37, 0x8a, 0x5d, 0x5d, 0x43, 0x38, 0x09, 0x68, 0xf4, 0x46, 0x90, 0x6d,
0x68, 0x8e, 0x38, 0xfa, 0xde, 0x20, 0x11, 0x53, 0xa7, 0xae, 0x9b, 0xbf, 0x5d, 0x4c, 0x60, 0x84,
0xf6, 0x85, 0x02, 0x1e, 0xe8, 0x6f, 0xda, 0x18, 0x5d, 0x18, 0x6e, 0x00, 0x24, 0x4f, 0xfd, 0x75,
0x26, 0x6a, 0x01, 0x59, 0x70, 0xbf, 0x05, 0x27, 0x1d, 0xe2, 0x17, 0xdc, 0x47, 0xcd, 0xf6, 0xd5,
0x14, 0xec, 0xad, 0x05, 0xeb, 0x85, 0x78, 0xad, 0x64, 0x1f, 0x6a, 0xc3, 0xa4, 0x03, 0x37, 0x93,
0x53, 0x1c, 0x71, 0x1f, 0x4d, 0xbb, 0xd8, 0xba, 0x5d, 0xd6, 0x78, 0xa1, 0x0a, 0xb5, 0xb1, 0x3b,
0x25, 0xb5, 0x5d, 0x87, 0xd1, 0x3e, 0x40, 0x2e, 0x6d, 0xa2, 0x53, 0x0f, 0xe7, 0xea, 0x54, 0x9e,
0x10, 0x5a, 0x1f, 0x65, 0x1b, 0xfb, 0xc5, 0x36, 0x9a, 0xbf, 0x87, 0x92, 0x2d, 0x34, 0x56, 0xd9,
0xbd, 0xb0, 0x7c, 0xa5, 0x7b, 0xe1, 0x3e, 0x34, 0x46, 0x8c, 0xfb, 0x03, 0xa3, 0xdf, 0xb6, 0x1e,
0x47, 0x50, 0x2e, 0xaa, 0x3d, 0xe4, 0x6b, 0xb0, 0x63, 0x3c, 0xd5, 0x22, 0x36, 0xa7, 0x90, 0x19,
0x19, 0xa0, 0x2a, 0xa2, 0xf4, 0x14, 0x56, 0xcb, 0x4e, 0x81, 0x6c, 0x40, 0x33, 0x60, 0xf1, 0xab,
0x81, 0x87, 0x3e, 0x4a, 0xf4, 0x9c, 0x4a, 0xdb, 0xea, 0xd4, 0x68, 0x43, 0xf9, 0xfa, 0x89, 0x2b,
0x77, 0xd9, 0x57, 0xf3, 0x97, 0x7d, 0x5e, 0x66, 0x6b, 0x45, 0x99, 0x6d, 0x41, 0x2d, 0xc6, 0xe1,
0xd9, 0xd0, 0x47, 0x4f, 0x4f, 0x59, 0x8d, 0x66, 0x36, 0x79, 0x08, 0x6b, 0x02, 0x63, 0xce, 0x7c,
0x7e, 0x8e, 0x03, 0xc1, 0xcf, 0xd1, 0x81, 0xb6, 0xd5, 0x59, 0xa1, 0x37, 0x32, 0xef, 0x01, 0x3f,
0x47, 0xf7, 0x31, 0xdc, 0xec, 0xc7, 0xd1, 0xb8, 0xa0, 0x70, 0x39, 0x79, 0xb2, 0x0a, 0xf2, 0xd4,
0x7b, 0x57, 0x01, 0xd0, 0xd0, 0x6d, 0xf5, 0xe8, 0x22, 0x63, 0x20, 0x3b, 0x28, 0xb7, 0xa3, 0x60,
0x1c, 0x85, 0x18, 0xca, 0xe4, 0xfa, 0x23, 0x4f, 0xe7, 0xbc, 0x1c, 0x66, 0xa1, 0x26, 0x61, 0xeb,
0xd1, 0x9c, 0x88, 0x29, 0xb8, 0xbb, 0x44, 0x02, 0x9d, 0xf1, 0x90, 0x07, 0x78, 0xc8, 0x87, 0xaf,
0xb6, 0x4f, 0x58, 0x18, 0xa2, 0x7f, 0x59, 0xc6, 0x29, 0x68, 0x9a, 0xf1, 0xd3, 0x62, 0x84, 0x31,
0x0e, 0x64, 0xcc, 0xc3, 0xe3, 0x74, 0x36, 0xdc, 0x25, 0x72, 0x0a, 0xb7, 0x76, 0x50, 0x67, 0xe7,
0x42, 0xf2, 0xa1, 0x48, 0x13, 0xf6, 0xe6, 0x27, 0x9c, 0x01, 0x5f, 0x31, 0xe5, 0x4f, 0x00, 0x17,
0xcd, 0x46, 0x16, 0x6b, 0xc6, 0x59, 0x02, 0xa7, 0x61, 0xd9, 0xf2, 0x1c, 0xd6, 0x8a, 0xaf, 0x15,
0xf2, 0x79, 0x59, 0x6c, 0xe9, 0x5b, 0xae, 0xf5, 0xc5, 0x22, 0xd0, 0x2c, 0x55, 0x0c, 0xeb, 0x33,
0xba, 0x43, 0x1e, 0x5f, 0xb6, 0xc4, 0xb4, 0xf4, 0xb6, 0x9e, 0x2c, 0x88, 0xce, 0x72, 0xee, 0x43,
0x3d, 0x6b, 0x67, 0xf2, 0xa0, 0x2c, 0x7a, 0xba, 0xdb, 0x5b, 0x97, 0x29, 0x9e, 0xbb, 0x44, 0x06,
0x00, 0x3b, 0x28, 0xf7, 0x50, 0xc6, 0x7c, 0x28, 0xc8, 0xa3, 0xd2, 0x43, 0xbc, 0x00, 0xa4, 0x8b,
0x7e, 0xf6, 0x5e, 0x5c, 0xba, 0xe5, 0xde, 0xdb, 0x15, 0x23, 0x83, 0xea, 0x21, 0xff, 0xff, 0x48,
0x7d, 0x80, 0x91, 0x3a, 0x84, 0x46, 0xee, 0x69, 0x4c, 0x4a, 0x87, 0x65, 0xf6, 0xed, 0xfc, 0x5f,
0x37, 0xc6, 0xd6, 0x97, 0x3f, 0xf6, 0x8e, 0xb9, 0x3c, 0x99, 0x1c, 0xa9, 0xd4, 0x9b, 0x09, 0xf2,
0x09, 0x8f, 0xcc, 0xd7, 0x66, 0xca, 0xd0, 0xa6, 0x5e, 0x69, 0x53, 0x97, 0x31, 0x3e, 0x3a, 0xaa,
0x68, 0xf3, 0xd9, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x1b, 0x2b, 0xc8, 0x1a, 0x1e, 0x0f, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -118,7 +118,7 @@ type Core struct {
CallGetFlushedSegmentsService func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error)
//call index builder's client to build index, return build id
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error)
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error)
@ -727,7 +727,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
}
}()
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) {
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (retID typeutil.UniqueID, retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("build index panic, msg = %v", err)
@ -740,6 +740,8 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
IndexParams: idxInfo.IndexParams,
IndexID: idxInfo.IndexID,
IndexName: idxInfo.IndexName,
NumRows: numRows,
FieldSchema: field,
})
if err != nil {
return retID, err
@ -864,7 +866,7 @@ func (c *Core) BuildIndex(ctx context.Context, segID typeutil.UniqueID, field *s
if err != nil {
return 0, err
}
bldID, err = c.CallBuildIndexService(ctx, binlogs, field, idxInfo)
bldID, err = c.CallBuildIndexService(ctx, binlogs, field, idxInfo, rows)
if err != nil {
return 0, err
}

View File

@ -2418,7 +2418,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) {
return 0, nil
}
err = c.checkInit()
@ -2595,7 +2595,7 @@ func TestCheckFlushedSegments(t *testing.T) {
core.MetaTable.indexID2Meta[indexID] = etcdpb.IndexInfo{
IndexID: indexID,
}
core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo) (int64, error) {
core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo, numRows int64) (int64, error) {
assert.Equal(t, fieldID, field.FieldID)
assert.Equal(t, indexID, idx.IndexID)
return -1, errors.New("build index build")
@ -2604,7 +2604,7 @@ func TestCheckFlushedSegments(t *testing.T) {
core.checkFlushedSegments(ctx)
var indexBuildID int64 = 10001
core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo) (int64, error) {
core.CallBuildIndexService = func(_ context.Context, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo, numRows int64) (int64, error) {
return indexBuildID, nil
}
core.checkFlushedSegments(core.ctx)