From c2c927bd5550c1b43499d7c5c6fefb75056242c0 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 13 Oct 2021 21:32:33 +0800 Subject: [PATCH] Use reader in the right way (#9790) Avoid to mantaining the position myself. Make Read bytes more easy to read and maintain. Signed-off-by: yangxuan --- internal/datanode/data_sync_service_test.go | 65 +++++++++++ .../datanode/flow_graph_insert_buffer_node.go | 105 +++++++++--------- 2 files changed, 120 insertions(+), 50 deletions(-) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index ca003c31ae..78ad4884a5 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -12,13 +12,17 @@ package datanode import ( + "bytes" "context" + "encoding/binary" "math" "testing" "time" "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -304,3 +308,64 @@ func TestDataSyncService_Start(t *testing.T) { sync.close() } + +func genBytes() (rawData []byte) { + const DIM = 2 + const N = 1 + + // Float vector + var fvector = [DIM]float32{1, 2} + for _, ele := range fvector { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + + // Binary vector + // Dimension of binary vector is 32 + // size := 4, = 32 / 8 + var bvector = []byte{255, 255, 255, 0} + rawData = append(rawData, bvector...) + + // Bool + var fieldBool = true + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil { + panic(err) + } + + rawData = append(rawData, buf.Bytes()...) + + // int8 + var dataInt8 int8 = 100 + bint8 := new(bytes.Buffer) + if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil { + panic(err) + } + rawData = append(rawData, bint8.Bytes()...) + log.Debug("Rawdata length:", zap.Int("Length of rawData", len(rawData))) + return +} + +func TestBytesReader(t *testing.T) { + rawData := genBytes() + + // Bytes Reader is able to recording the position + rawDataReader := bytes.NewReader(rawData) + + var fvector []float32 = make([]float32, 2) + binary.Read(rawDataReader, binary.LittleEndian, &fvector) + assert.ElementsMatch(t, fvector, []float32{1, 2}) + + var bvector []byte = make([]byte, 4) + binary.Read(rawDataReader, binary.LittleEndian, &bvector) + assert.ElementsMatch(t, bvector, []byte{255, 255, 255, 0}) + + var fieldBool bool + binary.Read(rawDataReader, binary.LittleEndian, &fieldBool) + assert.Equal(t, true, fieldBool) + + var dataInt8 int8 + binary.Read(rawDataReader, binary.LittleEndian, &dataInt8) + assert.Equal(t, int8(100), dataInt8) +} diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index c6d6061774..753b7646b1 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -17,10 +17,10 @@ import ( "encoding/binary" "errors" "fmt" + "io" "path" "strconv" "sync" - "unsafe" "github.com/golang/protobuf/proto" "github.com/opentracing/opentracing-go" @@ -443,7 +443,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos idata := buffer.buffer // 1.2 Get Fields - var pos int = 0 // Record position of blob var fieldIDs []int64 var fieldTypes []schemapb.DataType for _, field := range collSchema.Fields { @@ -451,6 +450,11 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos fieldTypes = append(fieldTypes, field.DataType) } + blobReaders := make([]io.Reader, 0) + for _, blob := range msg.RowData { + blobReaders = append(blobReaders, bytes.NewReader(blob.GetValue())) + } + for _, field := range collSchema.Fields { switch field.DataType { case schemapb.DataType_FloatVector: @@ -475,18 +479,14 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) + for _, r := range blobReaders { + var v []float32 = make([]float32, dim) - var offset int - for _, blob := range msg.RowData { - offset = 0 - for j := 0; j < dim; j++ { - var v float32 - readBinary(blob.GetValue()[pos+offset:], &v, field.DataType) - fieldData.Data = append(fieldData.Data, v) - offset += int(unsafe.Sizeof(*(&v))) - } + readBinary(r, &v, field.DataType) + + fieldData.Data = append(fieldData.Data, v...) } - pos += offset + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_BinaryVector: @@ -511,13 +511,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) - var offset int - for _, blob := range msg.RowData { - bv := blob.GetValue()[pos : pos+(dim/8)] - fieldData.Data = append(fieldData.Data, bv...) - offset = len(bv) + for _, r := range blobReaders { + var v []byte = make([]byte, dim/8) + readBinary(r, &v, field.DataType) + + fieldData.Data = append(fieldData.Data, v...) } - pos += offset + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Bool: @@ -529,12 +529,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) - var v bool - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + for _, r := range blobReaders { + var v bool + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int8: @@ -546,12 +547,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) - var v int8 - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + for _, r := range blobReaders { + var v int8 + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int16: @@ -563,12 +565,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) - var v int16 - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + for _, r := range blobReaders { + var v int16 + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int32: @@ -580,12 +583,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) - var v int32 - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + for _, r := range blobReaders { + var v int32 + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Int64: @@ -607,12 +611,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) default: - var v int64 - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + for _, r := range blobReaders { + var v int64 + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) } @@ -625,12 +630,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) - var v float32 - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + + for _, r := range blobReaders { + var v float32 + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - pos += int(unsafe.Sizeof(*(&v))) fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) case schemapb.DataType_Double: @@ -642,13 +648,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos } fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) - var v float64 - for _, blob := range msg.RowData { - readBinary(blob.GetValue()[pos:], &v, field.DataType) + + for _, r := range blobReaders { + var v float64 + readBinary(r, &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) } - - pos += int(unsafe.Sizeof(*(&v))) fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) } } @@ -670,9 +676,8 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos // readBinary read data in bytes and write it into receiver. // The receiver can be any type in int8, int16, int32, int64, float32, float64 and bool // readBinary uses LittleEndian ByteOrder. -func readBinary(data []byte, receiver interface{}, dataType schemapb.DataType) { - buf := bytes.NewReader(data) - err := binary.Read(buf, binary.LittleEndian, receiver) +func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataType) { + err := binary.Read(reader, binary.LittleEndian, receiver) if err != nil { log.Error("binary.Read failed", zap.Any("data type", dataType), zap.Error(err)) }