Read vector from disk (#6707)

* Read vector from disk

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* go fmt

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix git action error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix test error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix action error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix caculate error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* change var name

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* remove unused method

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* remove unused method

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix len error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* remove unused code

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* change bytes to float method

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* change float to bytes method

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix action error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-07-24 09:25:22 +08:00 committed by GitHub
parent aa20d88f6f
commit db94d7771f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 365 additions and 408 deletions

View File

@ -83,10 +83,10 @@ type segmentFlushUnit struct {
type insertBuffer struct {
insertData map[UniqueID]*InsertData // SegmentID to InsertData
maxSize int32
maxSize int64
}
func (ib *insertBuffer) size(segmentID UniqueID) int32 {
func (ib *insertBuffer) size(segmentID UniqueID) int64 {
if ib.insertData == nil || len(ib.insertData) <= 0 {
return 0
}
@ -95,16 +95,32 @@ func (ib *insertBuffer) size(segmentID UniqueID) int32 {
return 0
}
var maxSize int32 = 0
var maxSize int64 = 0
for _, data := range idata.Data {
fdata, ok := data.(*storage.FloatVectorFieldData)
if ok && int32(fdata.NumRows) > maxSize {
maxSize = int32(fdata.NumRows)
if ok {
totalNumRows := int64(0)
if fdata.NumRows != nil {
for _, numRow := range fdata.NumRows {
totalNumRows += numRow
}
}
if totalNumRows > maxSize {
maxSize = totalNumRows
}
}
bdata, ok := data.(*storage.BinaryVectorFieldData)
if ok && int32(bdata.NumRows) > maxSize {
maxSize = int32(bdata.NumRows)
if ok {
totalNumRows := int64(0)
if bdata.NumRows != nil {
for _, numRow := range bdata.NumRows {
totalNumRows += numRow
}
}
if totalNumRows > maxSize {
maxSize = totalNumRows
}
}
}
@ -112,7 +128,7 @@ func (ib *insertBuffer) size(segmentID UniqueID) int32 {
}
func (ib *insertBuffer) full(segmentID UniqueID) bool {
log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int32("size", ib.size(segmentID)), zap.Int32("maxsize", ib.maxSize))
log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int64("size", ib.size(segmentID)), zap.Int64("maxsize", ib.maxSize))
return ib.size(segmentID) >= ib.maxSize
}
@ -247,7 +263,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]float32, 0),
Dim: dim,
}
@ -269,7 +285,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
pos += offset
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_BinaryVector:
var dim int
@ -289,7 +305,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]byte, 0),
Dim: dim,
}
@ -303,12 +319,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
offset = len(bv)
}
pos += offset
fieldData.NumRows += len(msg.RowData)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Bool:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]bool, 0),
}
}
@ -324,12 +340,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]int8, 0),
}
}
@ -344,12 +360,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int16:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int16FieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]int16, 0),
}
}
@ -364,12 +380,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]int32, 0),
}
}
@ -379,17 +395,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read int32 wrong", zap.Error(err))
log.Error("binary.Read int64 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int64:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int64FieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]int64, 0),
}
}
@ -398,12 +414,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
switch field.FieldID {
case 0: // rowIDs
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case 1: // Timestamps
for _, ts := range msg.Timestamps {
fieldData.Data = append(fieldData.Data, int64(ts))
}
fieldData.NumRows += len(msg.Timestamps)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
default:
var v int64
for _, blob := range msg.RowData {
@ -414,13 +430,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
case schemapb.DataType_Float:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatFieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]float32, 0),
}
}
@ -435,12 +451,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Double:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.DoubleFieldData{
NumRows: 0,
NumRows: make([]int64, 0, 1),
Data: make([]float64, 0),
}
}
@ -456,7 +472,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
}
@ -475,7 +491,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Debug("......")
break
}
log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k)))
log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int64("buffer size", ibNode.insertBuffer.size(k)))
stopSign++
}
}
@ -486,7 +502,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// If full, auto flush
if ibNode.insertBuffer.full(segToFlush) {
log.Debug(". Insert Buffer full, auto flushing ",
zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush)))
zap.Int64("num of rows", ibNode.insertBuffer.size(segToFlush)))
collMeta, err := ibNode.getCollMetabySegID(segToFlush, iMsg.timeRange.timestampMax)
if err != nil {

View File

@ -151,15 +151,15 @@ func TestFlushSegment(t *testing.T) {
Data: make(map[storage.FieldID]storage.FieldData),
}
insertData.Data[0] = &storage.Int64FieldData{
NumRows: 10,
NumRows: []int64{10},
Data: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
insertData.Data[1] = &storage.Int64FieldData{
NumRows: 10,
NumRows: []int64{10},
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
}
insertData.Data[107] = &storage.FloatFieldData{
NumRows: 10,
NumRows: []int64{10},
Data: make([]float32, 10),
}
flushMap.Store(segmentID, insertData)
@ -374,9 +374,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
if i == 1 {
assert.Equal(t, test.expectedSegID, flushUnit[0].segID)
assert.Equal(t, int32(0), iBNode.insertBuffer.size(UniqueID(i+1)))
assert.Equal(t, int64(0), iBNode.insertBuffer.size(UniqueID(i+1)))
} else {
assert.Equal(t, int32(1), iBNode.insertBuffer.size(UniqueID(i+1)))
assert.Equal(t, int64(1), iBNode.insertBuffer.size(UniqueID(i+1)))
}
}

View File

@ -30,7 +30,7 @@ type ParamTable struct {
Port int
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
FlushInsertBufferSize int32
FlushInsertBufferSize int64
InsertBinlogRootPath string
StatsBinlogRootPath string
Log log.Config
@ -124,7 +124,7 @@ func (p *ParamTable) initFlowGraphMaxParallelism() {
// ---- flush configs ----
func (p *ParamTable) initFlushInsertBufferSize() {
p.FlushInsertBufferSize = p.ParseInt32("datanode.flush.insertBufSize")
p.FlushInsertBufferSize = p.ParseInt64("datanode.flush.insertBufSize")
}
func (p *ParamTable) initInsertBinlogRootPath() {

View File

@ -86,11 +86,11 @@ func TestIndexNode(t *testing.T) {
tsData[i] = int64(i + 100)
}
data[tsFieldID] = &storage.Int64FieldData{
NumRows: nb,
NumRows: []int64{nb},
Data: tsData,
}
data[floatVectorFieldID] = &storage.FloatVectorFieldData{
NumRows: nb,
NumRows: []int64{nb},
Data: generateFloatVectors(),
Dim: dim,
}
@ -201,11 +201,11 @@ func TestIndexNode(t *testing.T) {
tsData[i] = int64(i + 100)
}
data[tsFieldID] = &storage.Int64FieldData{
NumRows: nb,
NumRows: []int64{nb},
Data: tsData,
}
data[binaryVectorFieldID] = &storage.BinaryVectorFieldData{
NumRows: nb,
NumRows: []int64{nb},
Data: generateBinaryVectors(),
Dim: dim,
}
@ -313,11 +313,11 @@ func TestIndexNode(t *testing.T) {
tsData[i] = int64(i + 100)
}
data[tsFieldID] = &storage.Int64FieldData{
NumRows: nb,
NumRows: []int64{nb},
Data: tsData,
}
data[floatVectorFieldID] = &storage.FloatVectorFieldData{
NumRows: nb,
NumRows: []int64{nb},
Data: generateFloatVectors(),
Dim: dim,
}

View File

@ -146,19 +146,19 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID
insertData := &storage.InsertData{
Data: map[int64]storage.FieldData{
0: &storage.Int64FieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: idData,
},
1: &storage.Int64FieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: timestamps,
},
100: &storage.Int64FieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: fieldAgeData,
},
101: &storage.FloatVectorFieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: fieldVecData,
Dim: DIM,
},

View File

@ -772,20 +772,20 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID
insertData := &storage.InsertData{
Data: map[int64]storage.FieldData{
0: &storage.Int64FieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: idData,
},
1: &storage.Int64FieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: timestamps,
},
100: &storage.FloatVectorFieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: fieldVecData,
Dim: DIM,
},
101: &storage.Int32FieldData{
NumRows: msgLength,
NumRows: []int64{msgLength},
Data: fieldAgeData,
},
},

View File

@ -12,6 +12,7 @@
package querynode
import (
"bytes"
"context"
"encoding/binary"
"fmt"
@ -27,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
@ -63,6 +65,8 @@ type queryCollection struct {
queryMsgStream msgstream.MsgStream
queryResultMsgStream msgstream.MsgStream
vcm *storage.VectorChunkManager
}
type ResultEntityIds []UniqueID
@ -72,13 +76,17 @@ func newQueryCollection(releaseCtx context.Context,
collectionID UniqueID,
historical *historical,
streaming *streaming,
factory msgstream.Factory) *queryCollection {
factory msgstream.Factory,
lcm storage.ChunkManager,
rcm storage.ChunkManager) *queryCollection {
unsolvedMsg := make([]queryMsg, 0)
queryStream, _ := factory.NewQueryMsgStream(releaseCtx)
queryResultStream, _ := factory.NewQueryMsgStream(releaseCtx)
vcm := storage.NewVectorChunkManager(lcm, rcm)
qc := &queryCollection{
releaseCtx: releaseCtx,
cancel: cancel,
@ -93,6 +101,8 @@ func newQueryCollection(releaseCtx context.Context,
queryMsgStream: queryStream,
queryResultMsgStream: queryResultStream,
vcm: vcm,
}
qc.register()
@ -1059,55 +1069,79 @@ func (q *queryCollection) search(msg queryMsg) error {
}
func (q *queryCollection) fillVectorFieldsData(segment *Segment, result *segcorepb.RetrieveResults) error {
collection, _ := q.streaming.replica.getCollectionByID(q.collectionID)
schema := &etcdpb.CollectionMeta{
ID: q.collectionID,
Schema: collection.schema}
schemaHelper, err := typeutil.CreateSchemaHelper(collection.schema)
if err != nil {
return err
}
for _, resultFieldData := range result.FieldsData {
vecFieldInfo, err := segment.getVectorFieldInfo(resultFieldData.FieldId)
if err != nil {
continue
}
// if vector raw data is in memory, result should has been filled in valid vector raw data
if vecFieldInfo.getRawDataInMemory() {
continue
}
// load vector field data
if err = q.historical.loader.loadSegmentVectorFieldData(vecFieldInfo); err != nil {
return err
}
log.Debug("FillVectorFieldData", zap.Any("fieldID", resultFieldData.FieldId))
for i, offset := range result.Offset {
var success bool
for _, path := range vecFieldInfo.fieldBinlog.Binlogs {
rawData := vecFieldInfo.getRawData(path)
var numRows, dim int64
switch fieldData := rawData.(type) {
case *storage.FloatVectorFieldData:
numRows = int64(fieldData.NumRows)
dim = int64(fieldData.Dim)
if offset < numRows {
copy(resultFieldData.GetVectors().GetFloatVector().Data[int64(i)*dim:int64(i+1)*dim], fieldData.Data[offset*dim:(offset+1)*dim])
success = true
} else {
offset -= numRows
}
case *storage.BinaryVectorFieldData:
numRows = int64(fieldData.NumRows)
dim = int64(fieldData.Dim)
if offset < numRows {
x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector)
copy(x.BinaryVector[int64(i)*dim/8:int64(i+1)*dim/8], fieldData.Data[offset*dim/8:(offset+1)*dim/8])
success = true
} else {
offset -= numRows
}
default:
return fmt.Errorf("unexpected field data type")
}
if success {
var vecPath string
for index, idBinlogRowSize := range segment.idBinlogRowSizes {
if offset < idBinlogRowSize {
vecPath = vecFieldInfo.fieldBinlog.Binlogs[index]
break
} else {
offset -= idBinlogRowSize
}
}
log.Debug("FillVectorFieldData", zap.Any("path", vecPath))
err := q.vcm.DownloadVectorFile(vecPath, schema)
if err != nil {
return err
}
dim := resultFieldData.GetVectors().GetDim()
log.Debug("FillVectorFieldData", zap.Any("dim", dim))
schema, err := schemaHelper.GetFieldFromID(resultFieldData.FieldId)
if err != nil {
return err
}
dataType := schema.DataType
log.Debug("FillVectorFieldData", zap.Any("datatype", dataType))
switch dataType {
case schemapb.DataType_BinaryVector:
rowBytes := dim / 8
x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector)
content := make([]byte, rowBytes)
_, err := q.vcm.ReadAt(vecPath, content, offset*rowBytes)
if err != nil {
return err
}
log.Debug("FillVectorFieldData", zap.Any("binaryVectorResult", content))
resultLen := dim / 8
copy(x.BinaryVector[i*int(resultLen):(i+1)*int(resultLen)], content)
case schemapb.DataType_FloatVector:
x := resultFieldData.GetVectors().GetData().(*schemapb.VectorField_FloatVector)
rowBytes := dim * 4
content := make([]byte, rowBytes)
_, err := q.vcm.ReadAt(vecPath, content, offset*rowBytes)
if err != nil {
return err
}
floatResult := make([]float32, dim)
buf := bytes.NewReader(content)
err = binary.Read(buf, binary.LittleEndian, &floatResult)
if err != nil {
return err
}
log.Debug("FillVectorFieldData", zap.Any("floatVectorResult", floatResult))
resultLen := dim
copy(x.FloatVector.Data[i*int(resultLen):(i+1)*int(resultLen)], floatResult)
}
}
}
return nil

View File

@ -17,8 +17,10 @@ import (
"go.uber.org/zap"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/storage"
)
type queryService struct {
@ -31,6 +33,9 @@ type queryService struct {
queryCollections map[UniqueID]*queryCollection
factory msgstream.Factory
lcm storage.ChunkManager
rcm storage.ChunkManager
}
func newQueryService(ctx context.Context,
@ -39,6 +44,28 @@ func newQueryService(ctx context.Context,
factory msgstream.Factory) *queryService {
queryServiceCtx, queryServiceCancel := context.WithCancel(ctx)
path, err := Params.Load("storage.path")
if err != nil {
panic(err)
}
lcm := storage.NewLocalChunkManager(path)
option := &miniokv.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
client, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
rcm := storage.NewMinioChunkManager(client)
return &queryService{
ctx: queryServiceCtx,
cancel: queryServiceCancel,
@ -49,6 +76,9 @@ func newQueryService(ctx context.Context,
queryCollections: make(map[UniqueID]*queryCollection),
factory: factory,
lcm: lcm,
rcm: rcm,
}
}
@ -73,7 +103,9 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) {
collectionID,
q.historical,
q.streaming,
q.factory)
q.factory,
q.lcm,
q.rcm)
q.queryCollections[collectionID] = qc
}

View File

@ -145,7 +145,6 @@ func TestSearch_Search(t *testing.T) {
node.historical,
node.streaming,
msFactory)
node.queryService.addQueryCollection(collectionID)
// load segment
err = node.historical.replica.addSegment(segmentID, defaultPartitionID, collectionID, "", segmentTypeSealed, true)
@ -155,6 +154,8 @@ func TestSearch_Search(t *testing.T) {
err = loadFields(segment, DIM, N)
assert.NoError(t, err)
node.queryService.addQueryCollection(collectionID)
err = sendSearchRequest(node.queryNodeLoopCtx, DIM)
assert.NoError(t, err)

View File

@ -36,7 +36,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
)
type segmentType int32
@ -49,47 +48,15 @@ const (
)
type VectorFieldInfo struct {
mu sync.RWMutex
fieldBinlog *datapb.FieldBinlog
rawDataInMemory bool
rawData map[string]storage.FieldData // map[binlogPath]FieldData
fieldBinlog *datapb.FieldBinlog
}
func newVectorFieldInfo(fieldBinlog *datapb.FieldBinlog) *VectorFieldInfo {
return &VectorFieldInfo{
fieldBinlog: fieldBinlog,
rawDataInMemory: false,
rawData: make(map[string]storage.FieldData),
fieldBinlog: fieldBinlog,
}
}
func (v *VectorFieldInfo) setRawData(binlogPath string, data storage.FieldData) {
v.mu.Lock()
defer v.mu.Unlock()
v.rawData[binlogPath] = data
}
func (v *VectorFieldInfo) getRawData(binlogPath string) storage.FieldData {
v.mu.Lock()
defer v.mu.Unlock()
if data, ok := v.rawData[binlogPath]; ok {
return data
}
return nil
}
func (v *VectorFieldInfo) setRawDataInMemory(flag bool) {
v.mu.Lock()
defer v.mu.Unlock()
v.rawDataInMemory = flag
}
func (v *VectorFieldInfo) getRawDataInMemory() bool {
v.mu.Lock()
defer v.mu.Unlock()
return v.rawDataInMemory
}
//--------------------------------------------------------------------------------------
type Segment struct {
segmentPtr C.CSegmentInterface
@ -116,6 +83,8 @@ type Segment struct {
paramMutex sync.RWMutex // guards index
indexInfos map[int64]*indexInfo
idBinlogRowSizes []int64
vectorFieldMutex sync.RWMutex // guards vectorFieldInfos
vectorFieldInfos map[UniqueID]*VectorFieldInfo
}
@ -137,6 +106,14 @@ func (s *Segment) getEnableIndex() bool {
return s.enableIndex
}
func (s *Segment) setIDBinlogRowSizes(sizes []int64) {
s.idBinlogRowSizes = sizes
}
func (s *Segment) getIDBinlogRowSizes() []int64 {
return s.idBinlogRowSizes
}
func (s *Segment) setRecentlyModified(modify bool) {
s.rmMutex.Lock()
defer s.rmMutex.Unlock()

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -242,10 +243,6 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
}
blobs = append(blobs, blob)
}
// mark the flag that vector raw data will be loaded into memory
if vecFieldInfo, err := segment.getVectorFieldInfo(fb.FieldID); err == nil {
vecFieldInfo.setRawDataInMemory(true)
}
}
_, _, insertData, err := iCodec.Deserialize(blobs)
@ -255,7 +252,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
}
for fieldID, value := range insertData.Data {
var numRows int
var numRows []int64
var data interface{}
switch fieldData := value.(type) {
case *storage.BoolFieldData:
@ -291,7 +288,14 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
default:
return errors.New("unexpected field data type")
}
err = segment.segmentLoadFieldData(fieldID, numRows, data)
if fieldID == rootcoord.TimeStampField {
segment.setIDBinlogRowSizes(numRows)
}
totalNumRows := int64(0)
for _, numRow := range numRows {
totalNumRows += numRow
}
err = segment.segmentLoadFieldData(fieldID, int(totalNumRows), data)
if err != nil {
// TODO: return or continue?
return err
@ -301,44 +305,6 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
return nil
}
func (loader *segmentLoader) loadSegmentVectorFieldData(info *VectorFieldInfo) error {
iCodec := storage.InsertCodec{}
defer func() {
err := iCodec.Close()
if err != nil {
log.Error(err.Error())
}
}()
for _, path := range info.fieldBinlog.Binlogs {
if data := info.getRawData(path); data != nil {
continue
}
log.Debug("load vector raw data", zap.String("path", path))
binLog, err := loader.minioKV.Load(path)
if err != nil {
return err
}
blob := &storage.Blob{
Key: path,
Value: []byte(binLog),
}
insertFieldData, err := iCodec.DeserializeOneVectorBinlog(blob)
if err != nil {
log.Error(err.Error())
return err
}
// save raw data into segment.vectorFieldInfo
info.setRawData(path, insertFieldData.Data)
}
return nil
}
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,

View File

@ -72,44 +72,44 @@ func (b Blob) GetValue() []byte {
type FieldData interface{}
type BoolFieldData struct {
NumRows int
NumRows []int64
Data []bool
}
type Int8FieldData struct {
NumRows int
NumRows []int64
Data []int8
}
type Int16FieldData struct {
NumRows int
NumRows []int64
Data []int16
}
type Int32FieldData struct {
NumRows int
NumRows []int64
Data []int32
}
type Int64FieldData struct {
NumRows int
NumRows []int64
Data []int64
}
type FloatFieldData struct {
NumRows int
NumRows []int64
Data []float32
}
type DoubleFieldData struct {
NumRows int
NumRows []int64
Data []float64
}
type StringFieldData struct {
NumRows int
NumRows []int64
Data []string
}
type BinaryVectorFieldData struct {
NumRows int
NumRows []int64
Data []byte
Dim int
}
type FloatVectorFieldData struct {
NumRows int
NumRows []int64
Data []float32
Dim int
}
@ -134,12 +134,6 @@ type InsertData struct {
Infos []BlobInfo
}
type InsertFieldData struct {
ID FieldID
Data FieldData
Infos []BlobInfo
}
// Blob key example:
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
type InsertCodec struct {
@ -299,7 +293,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
boolFieldData.NumRows += length
boolFieldData.NumRows = append(boolFieldData.NumRows, int64(length))
resultData.Data[fieldID] = boolFieldData
case schemapb.DataType_Int8:
if resultData.Data[fieldID] == nil {
@ -316,7 +310,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int8FieldData.NumRows += length
int8FieldData.NumRows = append(int8FieldData.NumRows, int64(length))
resultData.Data[fieldID] = int8FieldData
case schemapb.DataType_Int16:
if resultData.Data[fieldID] == nil {
@ -333,7 +327,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int16FieldData.NumRows += length
int16FieldData.NumRows = append(int16FieldData.NumRows, int64(length))
resultData.Data[fieldID] = int16FieldData
case schemapb.DataType_Int32:
if resultData.Data[fieldID] == nil {
@ -350,7 +344,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int32FieldData.NumRows += length
int32FieldData.NumRows = append(int32FieldData.NumRows, int64(length))
resultData.Data[fieldID] = int32FieldData
case schemapb.DataType_Int64:
if resultData.Data[fieldID] == nil {
@ -367,7 +361,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int64FieldData.NumRows += length
int64FieldData.NumRows = append(int64FieldData.NumRows, int64(length))
resultData.Data[fieldID] = int64FieldData
case schemapb.DataType_Float:
if resultData.Data[fieldID] == nil {
@ -384,7 +378,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
floatFieldData.NumRows += length
floatFieldData.NumRows = append(floatFieldData.NumRows, int64(length))
resultData.Data[fieldID] = floatFieldData
case schemapb.DataType_Double:
if resultData.Data[fieldID] == nil {
@ -401,7 +395,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
doubleFieldData.NumRows += length
doubleFieldData.NumRows = append(doubleFieldData.NumRows, int64(length))
resultData.Data[fieldID] = doubleFieldData
case schemapb.DataType_String:
if resultData.Data[fieldID] == nil {
@ -413,7 +407,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
stringFieldData.NumRows += length
stringFieldData.NumRows = append(stringFieldData.NumRows, int64(length))
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
@ -438,7 +432,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
binaryVectorFieldData.NumRows += length
binaryVectorFieldData.NumRows = append(binaryVectorFieldData.NumRows, int64(length))
resultData.Data[fieldID] = binaryVectorFieldData
case schemapb.DataType_FloatVector:
if resultData.Data[fieldID] == nil {
@ -456,7 +450,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
floatVectorFieldData.NumRows += length
floatVectorFieldData.NumRows = append(floatVectorFieldData.NumRows, int64(length))
resultData.Data[fieldID] = floatVectorFieldData
default:
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType)
@ -474,75 +468,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return pID, sID, resultData, nil
}
func (insertCodec *InsertCodec) DeserializeOneVectorBinlog(blob *Blob) (data *InsertFieldData, err error) {
resultData := &InsertFieldData{
ID: InvalidUniqueID,
}
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
return nil, err
}
dataType := binlogReader.PayloadDataType
fieldID := binlogReader.FieldID
totalLength := 0
for {
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return nil, err
}
if eventReader == nil {
break
}
switch dataType {
case schemapb.DataType_BinaryVector:
if resultData.ID == InvalidUniqueID {
resultData.ID = fieldID
resultData.Data = &BinaryVectorFieldData{}
}
binaryVectorFieldData := resultData.Data.(*BinaryVectorFieldData)
var singleData []byte
singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload()
if err != nil {
return nil, err
}
binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return nil, err
}
totalLength += length
binaryVectorFieldData.NumRows += length
resultData.Data = binaryVectorFieldData
case schemapb.DataType_FloatVector:
if resultData.ID == InvalidUniqueID {
resultData.ID = fieldID
resultData.Data = &FloatVectorFieldData{}
}
floatVectorFieldData := resultData.Data.(*FloatVectorFieldData)
var singleData []float32
singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload()
if err != nil {
return nil, err
}
floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return nil, err
}
totalLength += length
floatVectorFieldData.NumRows += length
resultData.Data = floatVectorFieldData
default:
return nil, fmt.Errorf("undefined data type %d", dataType)
}
}
if err = binlogReader.Close(); err != nil {
return nil, err
}
return resultData, nil
}
func (insertCodec *InsertCodec) Close() error {
for _, closeFunc := range insertCodec.readerCloseFunc {
err := closeFunc()

View File

@ -142,52 +142,52 @@ func TestInsertCodec(t *testing.T) {
insertData1 := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
TimestampField: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
BoolField: &BoolFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []bool{true, false},
},
Int8Field: &Int8FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int8{3, 4},
},
Int16Field: &Int16FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int16{3, 4},
},
Int32Field: &Int32FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int32{3, 4},
},
Int64Field: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
FloatField: &FloatFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{3, 4},
},
DoubleField: &DoubleFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float64{3, 4},
},
StringField: &StringFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []string{"3", "4"},
},
BinaryVectorField: &BinaryVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
Dim: 4,
},
@ -197,52 +197,52 @@ func TestInsertCodec(t *testing.T) {
insertData2 := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{1, 2},
},
TimestampField: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{1, 2},
},
BoolField: &BoolFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []bool{true, false},
},
Int8Field: &Int8FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int8{1, 2},
},
Int16Field: &Int16FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int16{1, 2},
},
Int32Field: &Int32FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int32{1, 2},
},
Int64Field: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{1, 2},
},
FloatField: &FloatFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{1, 2},
},
DoubleField: &DoubleFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float64{1, 2},
},
StringField: &StringFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []string{"1", "2"},
},
BinaryVectorField: &BinaryVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
},
@ -265,18 +265,18 @@ func TestInsertCodec(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, UniqueID(PartitionID), partID)
assert.Equal(t, UniqueID(SegmentID), segID)
assert.Equal(t, 4, resultData.Data[RowIDField].(*Int64FieldData).NumRows)
assert.Equal(t, 4, resultData.Data[TimestampField].(*Int64FieldData).NumRows)
assert.Equal(t, 4, resultData.Data[BoolField].(*BoolFieldData).NumRows)
assert.Equal(t, 4, resultData.Data[Int8Field].(*Int8FieldData).NumRows)
assert.Equal(t, 4, resultData.Data[Int16Field].(*Int16FieldData).NumRows)
assert.Equal(t, 4, resultData.Data[Int32Field].(*Int32FieldData).NumRows)
assert.Equal(t, 4, resultData.Data[Int64Field].(*Int64FieldData).NumRows)
assert.Equal(t, 4, resultData.Data[FloatField].(*FloatFieldData).NumRows)
assert.Equal(t, 4, resultData.Data[DoubleField].(*DoubleFieldData).NumRows)
assert.Equal(t, 4, resultData.Data[StringField].(*StringFieldData).NumRows)
assert.Equal(t, 4, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows)
assert.Equal(t, 4, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[RowIDField].(*Int64FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[TimestampField].(*Int64FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[BoolField].(*BoolFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int8Field].(*Int8FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int16Field].(*Int16FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int32Field].(*Int32FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int64Field].(*Int64FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[FloatField].(*FloatFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[DoubleField].(*DoubleFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[StringField].(*StringFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows)
assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[RowIDField].(*Int64FieldData).Data)
assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[TimestampField].(*Int64FieldData).Data)
assert.Equal(t, []bool{true, false, true, false}, resultData.Data[BoolField].(*BoolFieldData).Data)
@ -412,15 +412,15 @@ func TestSchemaError(t *testing.T) {
insertData := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
TimestampField: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
BoolField: &BoolFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []bool{true, false},
},
},

View File

@ -123,52 +123,52 @@ func TestDataSorter(t *testing.T) {
insertDataFirst := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{6, 4},
},
1: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
100: &BoolFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []bool{true, false},
},
101: &Int8FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int8{3, 4},
},
102: &Int16FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int16{3, 4},
},
103: &Int32FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int32{3, 4},
},
104: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
105: &FloatFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{3, 4},
},
106: &DoubleFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float64{3, 4},
},
107: &StringFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []string{"3", "4"},
},
108: &BinaryVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Dim: 8,
},

View File

@ -25,15 +25,12 @@ type LocalChunkManager struct {
}
func NewLocalChunkManager(localPath string) *LocalChunkManager {
if _, err := os.Stat(localPath); os.IsNotExist(err) {
os.MkdirAll(localPath, os.ModePerm)
}
return &LocalChunkManager{
localPath: localPath,
}
}
func (lcm *LocalChunkManager) Load(key string) (string, error) {
func (lcm *LocalChunkManager) GetPath(key string) (string, error) {
if !lcm.Exist(key) {
return "", errors.New("local file cannot be found with key:" + key)
}
@ -42,8 +39,15 @@ func (lcm *LocalChunkManager) Load(key string) (string, error) {
}
func (lcm *LocalChunkManager) Write(key string, content []byte) error {
path := path.Join(lcm.localPath, key)
err := ioutil.WriteFile(path, content, 0644)
filePath := path.Join(lcm.localPath, key)
dir := path.Dir(filePath)
if _, err := os.Stat(dir); os.IsNotExist(err) {
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return err
}
}
err := ioutil.WriteFile(filePath, content, 0644)
if err != nil {
return err
}
@ -59,7 +63,7 @@ func (lcm *LocalChunkManager) Exist(key string) bool {
return true
}
func (lcm *LocalChunkManager) ReadAll(key string) ([]byte, error) {
func (lcm *LocalChunkManager) Read(key string) ([]byte, error) {
path := path.Join(lcm.localPath, key)
file, err := os.Open(path)
if err != nil {

View File

@ -27,7 +27,7 @@ func NewMinioChunkManager(minio *miniokv.MinIOKV) *MinioChunkManager {
}
}
func (mcm *MinioChunkManager) Load(key string) (string, error) {
func (mcm *MinioChunkManager) GetPath(key string) (string, error) {
if !mcm.Exist(key) {
return "", errors.New("minio file manage cannot be found with key:" + key)
}
@ -42,7 +42,7 @@ func (mcm *MinioChunkManager) Exist(key string) bool {
return mcm.minio.Exist(key)
}
func (mcm *MinioChunkManager) ReadAll(key string) ([]byte, error) {
func (mcm *MinioChunkManager) Read(key string) ([]byte, error) {
results, err := mcm.minio.Load(key)
return []byte(results), err
}

View File

@ -172,52 +172,52 @@ func TestPrintBinlogFiles(t *testing.T) {
insertDataFirst := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
1: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
100: &BoolFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []bool{true, false},
},
101: &Int8FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int8{3, 4},
},
102: &Int16FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int16{3, 4},
},
103: &Int32FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int32{3, 4},
},
104: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
105: &FloatFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{3, 4},
},
106: &DoubleFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float64{3, 4},
},
107: &StringFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []string{"3", "4"},
},
108: &BinaryVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 8,
},
@ -227,52 +227,52 @@ func TestPrintBinlogFiles(t *testing.T) {
insertDataSecond := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{1, 2},
},
1: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{1, 2},
},
100: &BoolFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []bool{true, false},
},
101: &Int8FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int8{1, 2},
},
102: &Int16FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int16{1, 2},
},
103: &Int32FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int32{1, 2},
},
104: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{1, 2},
},
105: &FloatFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{1, 2},
},
106: &DoubleFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float64{1, 2},
},
107: &StringFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []string{"1", "2"},
},
108: &BinaryVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 8,
},

View File

@ -12,9 +12,9 @@
package storage
type ChunkManager interface {
Load(key string) (string, error)
GetPath(key string) (string, error)
Write(key string, content []byte) error
Exist(key string) bool
ReadAll(key string) ([]byte, error)
Read(key string) ([]byte, error)
ReadAt(key string, p []byte, off int64) (n int, err error)
}

View File

@ -12,8 +12,9 @@
package storage
import (
"bytes"
"encoding/binary"
"math"
"errors"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
)
@ -21,35 +22,29 @@ import (
type VectorChunkManager struct {
localChunkManager ChunkManager
remoteChunkManager ChunkManager
insertCodec *InsertCodec
}
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta) *VectorChunkManager {
insertCodec := NewInsertCodec(schema)
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager) *VectorChunkManager {
return &VectorChunkManager{
localChunkManager: localChunkManager,
remoteChunkManager: remoteChunkManager,
insertCodec: insertCodec,
}
}
func (vcm *VectorChunkManager) Load(key string) (string, error) {
if vcm.localChunkManager.Exist(key) {
return vcm.localChunkManager.Load(key)
}
content, err := vcm.remoteChunkManager.ReadAll(key)
func (vcm *VectorChunkManager) DownloadVectorFile(key string, schema *etcdpb.CollectionMeta) error {
insertCodec := NewInsertCodec(schema)
content, err := vcm.remoteChunkManager.Read(key)
if err != nil {
return "", err
return err
}
blob := &Blob{
Key: key,
Value: content,
}
_, _, data, err := vcm.insertCodec.Deserialize([]*Blob{blob})
_, _, data, err := insertCodec.Deserialize([]*Blob{blob})
if err != nil {
return "", err
return err
}
for _, singleData := range data.Data {
@ -59,16 +54,23 @@ func (vcm *VectorChunkManager) Load(key string) (string, error) {
}
floatVector, ok := singleData.(*FloatVectorFieldData)
if ok {
floatData := floatVector.Data
result := make([]byte, 0)
for _, singleFloat := range floatData {
result = append(result, Float32ToByte(singleFloat)...)
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, floatVector.Data)
if err != nil {
return err
}
vcm.localChunkManager.Write(key, result)
vcm.localChunkManager.Write(key, buf.Bytes())
}
}
vcm.insertCodec.Close()
return vcm.localChunkManager.Load(key)
insertCodec.Close()
return nil
}
func (vcm *VectorChunkManager) GetPath(key string) (string, error) {
if vcm.localChunkManager.Exist(key) {
return vcm.localChunkManager.GetPath(key)
}
return vcm.localChunkManager.GetPath(key)
}
func (vcm *VectorChunkManager) Write(key string, content []byte) error {
@ -79,31 +81,13 @@ func (vcm *VectorChunkManager) Exist(key string) bool {
return vcm.localChunkManager.Exist(key)
}
func (vcm *VectorChunkManager) ReadAll(key string) ([]byte, error) {
func (vcm *VectorChunkManager) Read(key string) ([]byte, error) {
if vcm.localChunkManager.Exist(key) {
return vcm.localChunkManager.ReadAll(key)
return vcm.localChunkManager.Read(key)
}
_, err := vcm.Load(key)
if err != nil {
return nil, err
}
return vcm.localChunkManager.ReadAll(key)
return nil, errors.New("the vector file doesn't exist, please call download first")
}
func (vcm *VectorChunkManager) ReadAt(key string, p []byte, off int64) (n int, err error) {
return vcm.localChunkManager.ReadAt(key, p, off)
}
func Float32ToByte(float float32) []byte {
bits := math.Float32bits(float)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
func ByteToFloat32(bytes []byte) float32 {
bits := binary.LittleEndian.Uint32(bytes)
return math.Float32frombits(bits)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
@ -44,7 +45,7 @@ func TestVectorChunkManager(t *testing.T) {
lcm := NewLocalChunkManager(localPath)
schema := initSchema()
vcm := NewVectorChunkManager(lcm, rcm, schema)
vcm := NewVectorChunkManager(lcm, rcm)
assert.NotNil(t, vcm)
binlogs := initBinlogFile(schema)
@ -52,22 +53,22 @@ func TestVectorChunkManager(t *testing.T) {
for _, binlog := range binlogs {
rcm.Write(binlog.Key, binlog.Value)
}
_, err = vcm.Load("108")
err = vcm.DownloadVectorFile("108", schema)
assert.Nil(t, err)
_, err = vcm.Load("109")
err = vcm.DownloadVectorFile("109", schema)
assert.Nil(t, err)
content, err := vcm.ReadAll("108")
content, err := vcm.Read("108")
assert.Nil(t, err)
assert.Equal(t, []byte{0, 255}, content)
content, err = vcm.ReadAll("109")
content, err = vcm.Read("109")
assert.Nil(t, err)
floatResult := make([]float32, 0)
for i := 0; i < len(content)/4; i++ {
singleData := ByteToFloat32(content[i*4 : i*4+4])
singleData := typeutil.ByteToFloat32(content[i*4 : i*4+4])
floatResult = append(floatResult, singleData)
}
assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult)
@ -79,7 +80,7 @@ func TestVectorChunkManager(t *testing.T) {
floatResult = make([]float32, 0)
for i := 0; i < len(content)/4; i++ {
singleData := ByteToFloat32(content[i*4 : i*4+4])
singleData := typeutil.ByteToFloat32(content[i*4 : i*4+4])
floatResult = append(floatResult, singleData)
}
assert.Equal(t, []float32{0, 111, 222, 333, 444, 555, 777, 666}, floatResult)
@ -163,24 +164,24 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob {
insertData := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
1: &Int64FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int64{3, 4},
},
101: &Int8FieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []int8{3, 4},
},
108: &BinaryVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: 2,
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666},
Dim: 8,
},

View File

@ -14,9 +14,26 @@ package typeutil
import (
"encoding/binary"
"fmt"
"math"
"reflect"
)
// Float32ToByte converts a float to byte slice.
func Float32ToByte(float float32) []byte {
bits := math.Float32bits(float)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
// BytesToFloat32 converts a byte slice to float32.
func ByteToFloat32(bytes []byte) float32 {
bits := binary.LittleEndian.Uint32(bytes)
return math.Float32frombits(bits)
}
// BytesToUint64 converts a byte slice to uint64.
func BytesToInt64(b []byte) (int64, error) {
if len(b) != 8 {