Use reader in the right way (#9790)

Avoid to mantaining the position myself.

Make Read bytes more easy to read and maintain.

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-10-13 21:32:33 +08:00 committed by GitHub
parent e481a2775a
commit c2c927bd55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 120 additions and 50 deletions

View File

@ -12,13 +12,17 @@
package datanode
import (
"bytes"
"context"
"encoding/binary"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -304,3 +308,64 @@ func TestDataSyncService_Start(t *testing.T) {
sync.close()
}
func genBytes() (rawData []byte) {
const DIM = 2
const N = 1
// Float vector
var fvector = [DIM]float32{1, 2}
for _, ele := range fvector {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
// Binary vector
// Dimension of binary vector is 32
// size := 4, = 32 / 8
var bvector = []byte{255, 255, 255, 0}
rawData = append(rawData, bvector...)
// Bool
var fieldBool = true
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil {
panic(err)
}
rawData = append(rawData, buf.Bytes()...)
// int8
var dataInt8 int8 = 100
bint8 := new(bytes.Buffer)
if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil {
panic(err)
}
rawData = append(rawData, bint8.Bytes()...)
log.Debug("Rawdata length:", zap.Int("Length of rawData", len(rawData)))
return
}
func TestBytesReader(t *testing.T) {
rawData := genBytes()
// Bytes Reader is able to recording the position
rawDataReader := bytes.NewReader(rawData)
var fvector []float32 = make([]float32, 2)
binary.Read(rawDataReader, binary.LittleEndian, &fvector)
assert.ElementsMatch(t, fvector, []float32{1, 2})
var bvector []byte = make([]byte, 4)
binary.Read(rawDataReader, binary.LittleEndian, &bvector)
assert.ElementsMatch(t, bvector, []byte{255, 255, 255, 0})
var fieldBool bool
binary.Read(rawDataReader, binary.LittleEndian, &fieldBool)
assert.Equal(t, true, fieldBool)
var dataInt8 int8
binary.Read(rawDataReader, binary.LittleEndian, &dataInt8)
assert.Equal(t, int8(100), dataInt8)
}

View File

@ -17,10 +17,10 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"path"
"strconv"
"sync"
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
@ -443,7 +443,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
idata := buffer.buffer
// 1.2 Get Fields
var pos int = 0 // Record position of blob
var fieldIDs []int64
var fieldTypes []schemapb.DataType
for _, field := range collSchema.Fields {
@ -451,6 +450,11 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
fieldTypes = append(fieldTypes, field.DataType)
}
blobReaders := make([]io.Reader, 0)
for _, blob := range msg.RowData {
blobReaders = append(blobReaders, bytes.NewReader(blob.GetValue()))
}
for _, field := range collSchema.Fields {
switch field.DataType {
case schemapb.DataType_FloatVector:
@ -475,18 +479,14 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
for _, r := range blobReaders {
var v []float32 = make([]float32, dim)
var offset int
for _, blob := range msg.RowData {
offset = 0
for j := 0; j < dim; j++ {
var v float32
readBinary(blob.GetValue()[pos+offset:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
offset += int(unsafe.Sizeof(*(&v)))
}
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v...)
}
pos += offset
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_BinaryVector:
@ -511,13 +511,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
var offset int
for _, blob := range msg.RowData {
bv := blob.GetValue()[pos : pos+(dim/8)]
fieldData.Data = append(fieldData.Data, bv...)
offset = len(bv)
for _, r := range blobReaders {
var v []byte = make([]byte, dim/8)
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v...)
}
pos += offset
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Bool:
@ -529,12 +529,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
var v bool
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v bool
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int8:
@ -546,12 +547,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
var v int8
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v int8
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int16:
@ -563,12 +565,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
var v int16
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v int16
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int32:
@ -580,12 +583,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
var v int32
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v int32
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int64:
@ -607,12 +611,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
default:
var v int64
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v int64
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
@ -625,12 +630,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
var v float32
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v float32
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Double:
@ -642,13 +648,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
var v float64
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
for _, r := range blobReaders {
var v float64
readBinary(r, &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
}
@ -670,9 +676,8 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
// readBinary read data in bytes and write it into receiver.
// The receiver can be any type in int8, int16, int32, int64, float32, float64 and bool
// readBinary uses LittleEndian ByteOrder.
func readBinary(data []byte, receiver interface{}, dataType schemapb.DataType) {
buf := bytes.NewReader(data)
err := binary.Read(buf, binary.LittleEndian, receiver)
func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataType) {
err := binary.Read(reader, binary.LittleEndian, receiver)
if err != nil {
log.Error("binary.Read failed", zap.Any("data type", dataType), zap.Error(err))
}