From 62e900234839a813a53c509ce2c6efa298086d2a Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 11 Dec 2020 11:29:07 +0800 Subject: [PATCH] Add git-hooks for check code Signed-off-by: cai.zhang --- githooks/README.md | 7 + internal/storage/binlog_test.go | 244 +++++++++++++++++++ internal/storage/binlog_writer.go | 15 +- internal/storage/binlog_writer_test.go | 6 +- internal/storage/data_codec.go | 310 +++++++------------------ internal/storage/data_codec_test.go | 137 +++++++---- internal/storage/event_data.go | 38 +++ internal/storage/event_test.go | 97 ++++---- internal/storage/event_writer_test.go | 2 + 9 files changed, 540 insertions(+), 316 deletions(-) create mode 100644 internal/storage/binlog_test.go diff --git a/githooks/README.md b/githooks/README.md index 205054c5c3..a2f1652550 100644 --- a/githooks/README.md +++ b/githooks/README.md @@ -1,5 +1,12 @@ **If you want to use git hooks, you need to install hooks first!** +## Install git-hooks +```shell script +export GO111MODULE="on" +go get -u github.com/git-hooks/git-hooks +``` + +## Install hooks run ```shell script git hooks install diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go new file mode 100644 index 0000000000..75ff432ab2 --- /dev/null +++ b/internal/storage/binlog_test.go @@ -0,0 +1,244 @@ +package storage + +import ( + "encoding/binary" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" +) + +func TestInsertBinlog(t *testing.T) { + w, err := NewInsertBinlogWriter(schemapb.DataType_INT64, 10, 20, 30, 40) + assert.Nil(t, err) + + e1, err := w.NextInsertEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + err = e1.AddDataToPayload([]int32{4, 5, 6}) + assert.NotNil(t, err) + err = e1.AddDataToPayload([]int64{4, 5, 6}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + + e2, err := w.NextInsertEventWriter() + assert.Nil(t, err) + err = e2.AddDataToPayload([]int64{7, 8, 9}) + assert.Nil(t, err) + err = e2.AddDataToPayload([]bool{true, false, true}) + assert.NotNil(t, err) + err = e2.AddDataToPayload([]int64{10, 11, 12}) + assert.Nil(t, err) + e2.SetStartTimestamp(300) + e2.SetEndTimestamp(400) + + w.SetStartTimeStamp(1000) + w.SetEndTimeStamp(2000) + + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + buf, err := w.GetBuffer() + assert.Nil(t, err) + + //magic number + magicNum := UnsafeReadInt32(buf, 0) + assert.Equal(t, magicNum, MagicNumber) + pos := int(unsafe.Sizeof(MagicNumber)) + + //descriptor header, timestamp + ts := UnsafeReadInt64(buf, pos) + assert.Greater(t, ts, int64(0)) + curts := time.Now().UnixNano() / int64(time.Millisecond) + curts = int64(tsoutil.ComposeTS(curts, 0)) + diffts := curts - ts + maxdiff := int64(tsoutil.ComposeTS(1000, 0)) + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(ts)) + + //descriptor header, type code + tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(tc), DescriptorEventType) + pos += int(unsafe.Sizeof(tc)) + + //descriptor header, server id + svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(svrID)) + + //descriptor header, event length + descEventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(descEventLen)) + + //descriptor header, next position + descNxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //descriptor data fix, binlog version + binLogVer := UnsafeReadInt16(buf, pos) + assert.Equal(t, binLogVer, int16(BinlogVersion)) + pos += int(unsafe.Sizeof(binLogVer)) + + //descriptor data fix, server version + svrVer := UnsafeReadInt64(buf, pos) + assert.Equal(t, svrVer, int64(ServerVersion)) + pos += int(unsafe.Sizeof(svrVer)) + + //descriptor data fix, commit id + cmitID := UnsafeReadInt64(buf, pos) + assert.Equal(t, cmitID, int64(CommitID)) + pos += int(unsafe.Sizeof(cmitID)) + + //descriptor data fix, header length + headLen := UnsafeReadInt8(buf, pos) + assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) + pos += int(unsafe.Sizeof(headLen)) + + //descriptor data fix, collection id + collID := UnsafeReadInt64(buf, pos) + assert.Equal(t, collID, int64(10)) + pos += int(unsafe.Sizeof(collID)) + + //descriptor data fix, partition id + partID := UnsafeReadInt64(buf, pos) + assert.Equal(t, partID, int64(20)) + pos += int(unsafe.Sizeof(partID)) + + //descriptor data fix, segment id + segID := UnsafeReadInt64(buf, pos) + assert.Equal(t, segID, int64(30)) + pos += int(unsafe.Sizeof(segID)) + + //descriptor data fix, field id + fieldID := UnsafeReadInt64(buf, pos) + assert.Equal(t, fieldID, int64(40)) + pos += int(unsafe.Sizeof(fieldID)) + + //descriptor data fix, start time stamp + startts := UnsafeReadInt64(buf, pos) + assert.Equal(t, startts, int64(1000)) + pos += int(unsafe.Sizeof(startts)) + + //descriptor data fix, end time stamp + endts := UnsafeReadInt64(buf, pos) + assert.Equal(t, endts, int64(2000)) + pos += int(unsafe.Sizeof(endts)) + + //descriptor data fix, payload type + colType := UnsafeReadInt32(buf, pos) + assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) + pos += int(unsafe.Sizeof(colType)) + + //descriptor data, post header lengths + for i := DescriptorEventType; i < EventTypeEnd; i++ { + size := getEventFixPartSize(i) + assert.Equal(t, uint8(size), buf[pos]) + pos++ + } + + //start of e1 + assert.Equal(t, pos, int(descNxtPos)) + + //insert e1 header, Timestamp + e1ts := UnsafeReadInt64(buf, pos) + diffts = curts - e1ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e1ts)) + + //insert e1 header, type code + e1tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e1tc), InsertEventType) + pos += int(unsafe.Sizeof(e1tc)) + + //insert e1 header, Server id + e1svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e1svrID)) + + //insert e1 header, event length + e1EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e1EventLen)) + + //insert e1 header, next position + e1NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e1 data, start time stamp + e1st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1st, int64(100)) + pos += int(unsafe.Sizeof(e1st)) + + //insert e1 data, end time stamp + e1et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1et, int64(200)) + pos += int(unsafe.Sizeof(e1et)) + + //insert e1, payload + e1Payload := buf[pos:e1NxtPos] + e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) + assert.Nil(t, err) + e1a, err := e1r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) + err = e1r.Close() + assert.Nil(t, err) + + //start of e2 + pos = int(e1NxtPos) + + //insert e2 header, Timestamp + e2ts := UnsafeReadInt64(buf, pos) + diffts = curts - e2ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e2ts)) + + //insert e2 header, type code + e2tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e2tc), InsertEventType) + pos += int(unsafe.Sizeof(e2tc)) + + //insert e2 header, Server id + e2svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e2svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e2svrID)) + + //insert e2 header, event length + e2EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e2EventLen)) + + //insert e2 header, next position + e2NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e2 data, start time stamp + e2st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2st, int64(300)) + pos += int(unsafe.Sizeof(e2st)) + + //insert e2 data, end time stamp + e2et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2et, int64(400)) + pos += int(unsafe.Sizeof(e2et)) + + //insert e2, payload + e2Payload := buf[pos:] + e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) + assert.Nil(t, err) + e2a, err := e2r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) + err = e2r.Close() + assert.Nil(t, err) + + assert.Equal(t, int(e2NxtPos), len(buf)) + +} diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 0c0adb0ecb..a84faed97d 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -77,6 +77,13 @@ func (writer *baseBinlogWriter) Close() error { if writer.buffer != nil { return nil } + if writer.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if writer.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } + var offset int32 writer.buffer = new(bytes.Buffer) if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil { @@ -85,7 +92,7 @@ func (writer *baseBinlogWriter) Close() error { if err := writer.descriptorEvent.Write(writer.buffer); err != nil { return err } - offset = writer.descriptorEvent.GetMemoryUsageInBytes() + offset = writer.descriptorEvent.GetMemoryUsageInBytes() + int32(binary.Size(MagicNumber)) writer.length = 0 for _, w := range writer.eventWriters { w.SetOffset(offset) @@ -196,12 +203,16 @@ func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEve return event, nil } -func NewInsertBinlogWriter(dataType schemapb.DataType) (*InsertBinlogWriter, error) { +func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) (*InsertBinlogWriter, error) { descriptorEvent, err := newDescriptorEvent() if err != nil { return nil, err } descriptorEvent.PayloadDataType = dataType + descriptorEvent.CollectionID = collectionID + descriptorEvent.PartitionID = partitionID + descriptorEvent.SegmentID = segmentID + descriptorEvent.FieldID = FieldID return &InsertBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ descriptorEvent: *descriptorEvent, diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index f2c9729b64..fd313b0df7 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -10,7 +10,9 @@ import ( ) func TestBinlogWriterReader(t *testing.T) { - binlogWriter, err := NewInsertBinlogWriter(schemapb.DataType_INT32) + binlogWriter, err := NewInsertBinlogWriter(schemapb.DataType_INT32, 10, 20, 30, 40) + binlogWriter.SetStartTimeStamp(1000) + binlogWriter.SetEndTimeStamp(2000) defer binlogWriter.Close() assert.Nil(t, err) eventWriter, err := binlogWriter.NextInsertEventWriter() @@ -19,6 +21,8 @@ func TestBinlogWriterReader(t *testing.T) { assert.Nil(t, err) _, err = binlogWriter.GetBuffer() assert.NotNil(t, err) + eventWriter.SetStartTimestamp(1000) + eventWriter.SetEndTimestamp(2000) err = binlogWriter.Close() assert.Nil(t, err) assert.EqualValues(t, 1, binlogWriter.GetEventNums()) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 42c0f58b61..55fc3b1252 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -2,8 +2,6 @@ package storage import ( "fmt" - "strconv" - "strings" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -11,8 +9,8 @@ import ( ) const ( - TsField int = 1 - RequestField int = 100 + TsField int64 = 1 + DDLField int64 = 2 ) type ( @@ -77,11 +75,10 @@ type FloatVectorFieldData struct { dim int } -// TODO: more types of FieldData - // system filed id: // 0: unique row id // 1: timestamp +// 2: ddl // 100: first user field id // 101: second user field id // 102: ... @@ -89,187 +86,71 @@ type FloatVectorFieldData struct { // example row_schema: {float_field, int_field, float_vector_field, string_field} // Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>} type InsertData struct { - Data map[int]FieldData // field id to field data + Data map[int64]FieldData // field id to field data } // Blob key example: -// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} +// ${segment_id}/${field_id} type InsertCodec struct { Base readerCloseFunc []func() error } -func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { +func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { var blobs []*Blob var writer *InsertBinlogWriter var err error ts := (data.Data[1]).(Int64FieldData).data - for fieldID, value := range data.Data { - switch singleData := value.(type) { - case BoolFieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_BOOL) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddBoolToPayload(singleData.data) - if err != nil { - return nil, err - } - case Int8FieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_INT8) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddInt8ToPayload(singleData.data) - if err != nil { - return nil, err - } - case Int16FieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_INT16) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddInt16ToPayload(singleData.data) - if err != nil { - return nil, err - } - case Int32FieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_INT32) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddInt32ToPayload(singleData.data) - if err != nil { - return nil, err - } - case Int64FieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_INT64) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddInt64ToPayload(singleData.data) - if err != nil { - return nil, err - } - case FloatFieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_FLOAT) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddFloatToPayload(singleData.data) - if err != nil { - return nil, err - } - case DoubleFieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_DOUBLE) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddDoubleToPayload(singleData.data) - if err != nil { - return nil, err - } - case StringFieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_STRING) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - for _, singleString := range singleData.data { + for _, field := range insertCodec.Schema.Schema.Fields { + singleData := data.Data[field.FieldID] + writer, err = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID) + if err != nil { + return nil, err + } + eventWriter, err := writer.NextInsertEventWriter() + if err != nil { + return nil, err + } + eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) + eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) + switch field.DataType { + case schemapb.DataType_BOOL: + err = eventWriter.AddBoolToPayload(singleData.(BoolFieldData).data) + case schemapb.DataType_INT8: + err = eventWriter.AddInt8ToPayload(singleData.(Int8FieldData).data) + case schemapb.DataType_INT16: + err = eventWriter.AddInt16ToPayload(singleData.(Int16FieldData).data) + case schemapb.DataType_INT32: + err = eventWriter.AddInt32ToPayload(singleData.(Int32FieldData).data) + case schemapb.DataType_INT64: + err = eventWriter.AddInt64ToPayload(singleData.(Int64FieldData).data) + case schemapb.DataType_FLOAT: + err = eventWriter.AddFloatToPayload(singleData.(FloatFieldData).data) + case schemapb.DataType_DOUBLE: + err = eventWriter.AddDoubleToPayload(singleData.(DoubleFieldData).data) + case schemapb.DataType_STRING: + for _, singleString := range singleData.(StringFieldData).data { err = eventWriter.AddOneStringToPayload(singleString) + if err != nil { + return nil, err + } } - if err != nil { - return nil, err - } - case BinaryVectorFieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_VECTOR_BINARY) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddBinaryVectorToPayload(singleData.data, singleData.dim) - if err != nil { - return nil, err - } - case FloatVectorFieldData: - writer, err = NewInsertBinlogWriter(schemapb.DataType_VECTOR_FLOAT) - if err != nil { - return nil, err - } - eventWriter, err := writer.NextInsertEventWriter() - if err != nil { - return nil, err - } - eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0])) - eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1])) - err = eventWriter.AddFloatVectorToPayload(singleData.data, singleData.dim) - if err != nil { - return nil, err - } + case schemapb.DataType_VECTOR_BINARY: + err = eventWriter.AddBinaryVectorToPayload(singleData.(BinaryVectorFieldData).data, singleData.(BinaryVectorFieldData).dim) + case schemapb.DataType_VECTOR_FLOAT: + err = eventWriter.AddFloatVectorToPayload(singleData.(FloatVectorFieldData).data, singleData.(FloatVectorFieldData).dim) + } + if err != nil { + return nil, err } if writer == nil { return nil, fmt.Errorf("binlog writer is nil") } - writer.CollectionID = insertCodec.Schema.ID - writer.PartitionID = partitionID - writer.SegmentID = segmentID writer.SetStartTimeStamp(typeutil.Timestamp(ts[0])) writer.SetEndTimeStamp(typeutil.Timestamp(ts[len(ts)-1])) - err := writer.Close() + err = writer.Close() if err != nil { return nil, err } @@ -278,8 +159,7 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm if err != nil { return nil, err } - blobKey := fmt.Sprintf("%d/insert_log/%d/%d/%d/%d/%d", - insertCodec.TenantID, insertCodec.Schema.ID, partitionID, segmentID, fieldID, logIdx) + blobKey := fmt.Sprintf("%d/%d", segmentID, field.FieldID) blobs = append(blobs, &Blob{ key: blobKey, value: buffer, @@ -296,24 +176,23 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID readerClose := func(reader *BinlogReader) func() error { return func() error { return reader.Close() } } - pID, _ := strconv.ParseInt(strings.Split(blobs[0].key, "/")[3], 0, 10) - sID, _ := strconv.ParseInt(strings.Split(blobs[0].key, "/")[4], 0, 10) var resultData InsertData - resultData.Data = make(map[int]FieldData) + var pID UniqueID + var sID UniqueID + resultData.Data = make(map[int64]FieldData) for _, blob := range blobs { - fieldID, err := strconv.Atoi(strings.Split(blob.key, "/")[5]) - + binlogReader, err := NewBinlogReader(blob.value) if err != nil { return -1, -1, nil, err } - dataType := insertCodec.Schema.Schema.Fields[fieldID].GetDataType() + // read partitionID and SegmentID + pID, sID = binlogReader.PartitionID, binlogReader.SegmentID + + dataType := binlogReader.PayloadDataType + fieldID := binlogReader.FieldID switch dataType { case schemapb.DataType_BOOL: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var boolFieldData BoolFieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -327,10 +206,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = boolFieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_INT8: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var int8FieldData Int8FieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -344,10 +219,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = int8FieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_INT16: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var int16FieldData Int16FieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -361,10 +232,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = int16FieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_INT32: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var int32FieldData Int32FieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -378,10 +245,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = int32FieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_INT64: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var int64FieldData Int64FieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -395,10 +258,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = int64FieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_FLOAT: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var floatFieldData FloatFieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -412,10 +271,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = floatFieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_DOUBLE: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var doubleFieldData DoubleFieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -429,10 +284,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = doubleFieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_STRING: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var stringFieldData StringFieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -453,10 +304,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = stringFieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_VECTOR_BINARY: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var binaryVectorFieldData BinaryVectorFieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -470,10 +317,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID resultData.Data[fieldID] = binaryVectorFieldData insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) case schemapb.DataType_VECTOR_FLOAT: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return -1, -1, nil, err - } var floatVectorFieldData FloatVectorFieldData eventReader, err := binlogReader.NextEventReader() if err != nil { @@ -503,13 +346,13 @@ func (insertCodec *InsertCodec) Close() error { } // Blob key example: -// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx} +// ${collection_id} type DataDefinitionCodec struct { Base readerCloseFunc []func() error } -func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { +func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING) if err != nil { return nil, err @@ -565,6 +408,9 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Times } } } + writer.FieldID = DDLField + writer.SetStartTimeStamp(ts[0]) + writer.SetEndTimeStamp(ts[len(ts)-1]) err = writer.Close() if err != nil { return nil, err @@ -573,8 +419,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Times if err != nil { return nil, err } - blobKey := fmt.Sprintf("%d/data_definition_log/%d/%d/%d", - dataDefinitionCodec.TenantID, dataDefinitionCodec.Schema.ID, RequestField, logIdx) + blobKey := fmt.Sprintf("%d/%d", dataDefinitionCodec.Schema.ID, DDLField) blobs = append(blobs, &Blob{ key: blobKey, value: buffer, @@ -597,6 +442,11 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Times if err != nil { return nil, err } + eventWriter.SetStartTimestamp(ts[0]) + eventWriter.SetEndTimestamp(ts[len(ts)-1]) + writer.SetStartTimeStamp(ts[0]) + writer.SetEndTimeStamp(ts[len(ts)-1]) + writer.FieldID = TsField err = writer.Close() if err != nil { return nil, err @@ -605,8 +455,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Times if err != nil { return nil, err } - blobKey = fmt.Sprintf("%d/data_definition_log/%d/%d/%d", - dataDefinitionCodec.TenantID, dataDefinitionCodec.Schema.ID, TsField, logIdx) + blobKey = fmt.Sprintf("%d/%d", dataDefinitionCodec.Schema.ID, TsField) blobs = append(blobs, &Blob{ key: blobKey, value: buffer, @@ -626,17 +475,14 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [ var requestsStrings []string var resultTs []Timestamp for _, blob := range blobs { - fieldID, err := strconv.Atoi(strings.Split(blob.key, "/")[3]) - + binlogReader, err := NewBinlogReader(blob.value) if err != nil { return nil, nil, err } + fieldID := binlogReader.FieldID + switch fieldID { case TsField: - binlogReader, err := NewBinlogReader(blob.value) - if err != nil { - return nil, nil, err - } eventReader, err := binlogReader.NextEventReader() if err != nil { return nil, nil, err @@ -649,7 +495,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [ resultTs = append(resultTs, Timestamp(singleTs)) } dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader)) - case RequestField: + case DDLField: binlogReader, err := NewBinlogReader(blob.value) if err != nil { return nil, nil, err @@ -692,3 +538,15 @@ func (dataDefinitionCodec *DataDefinitionCodec) Close() error { } return nil } + +type IndexCodec struct { + Base +} + +func (indexCodec *IndexCodec) Serialize(blobs []*Blob) ([]*Blob, error) { + return blobs, nil +} + +func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) { + return blobs, nil +} diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 34cb252fa6..f7813e12d5 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -8,7 +8,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) -func TestInsertCodecWriter(t *testing.T) { +func TestInsertCodec(t *testing.T) { base := Base{ Version: 1, CommitID: 1, @@ -24,63 +24,80 @@ func TestInsertCodecWriter(t *testing.T) { AutoID: true, Fields: []*schemapb.FieldSchema{ { - Name: "field_bool", + FieldID: 1, + Name: "Ts", IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_BOOL, - }, - { - Name: "field_int64", - IsPrimaryKey: false, - Description: "description_1", + Description: "Ts", DataType: schemapb.DataType_INT64, }, { - Name: "field_int16", + FieldID: 100, + Name: "field_bool", IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_INT16, - }, - { - Name: "field_int32", - IsPrimaryKey: false, - Description: "description_1", - DataType: schemapb.DataType_INT32, + Description: "description_2", + DataType: schemapb.DataType_BOOL, }, { + FieldID: 101, Name: "field_int8", IsPrimaryKey: false, - Description: "description_1", + Description: "description_3", DataType: schemapb.DataType_INT8, }, { + FieldID: 102, + Name: "field_int16", + IsPrimaryKey: false, + Description: "description_4", + DataType: schemapb.DataType_INT16, + }, + { + FieldID: 103, + Name: "field_int32", + IsPrimaryKey: false, + Description: "description_5", + DataType: schemapb.DataType_INT32, + }, + { + FieldID: 104, + Name: "field_int64", + IsPrimaryKey: false, + Description: "description_6", + DataType: schemapb.DataType_INT64, + }, + { + FieldID: 105, Name: "field_float", IsPrimaryKey: false, - Description: "description_1", + Description: "description_7", DataType: schemapb.DataType_FLOAT, }, { + FieldID: 106, Name: "field_double", IsPrimaryKey: false, - Description: "description_1", + Description: "description_8", DataType: schemapb.DataType_DOUBLE, }, { + FieldID: 107, Name: "field_string", IsPrimaryKey: false, - Description: "description_1", + Description: "description_9", DataType: schemapb.DataType_STRING, }, { + FieldID: 108, Name: "field_binary_vector", IsPrimaryKey: false, - Description: "description_1", + Description: "description_10", DataType: schemapb.DataType_VECTOR_BINARY, }, { + FieldID: 109, Name: "field_float_vector", IsPrimaryKey: false, - Description: "description_1", + Description: "description_11", DataType: schemapb.DataType_VECTOR_FLOAT, }, }, @@ -92,61 +109,65 @@ func TestInsertCodecWriter(t *testing.T) { make([]func() error, 0), } insertData := &InsertData{ - Data: map[int]FieldData{ - 0: BoolFieldData{ - NumRows: 2, - data: []bool{true, false}, - }, + Data: map[int64]FieldData{ 1: Int64FieldData{ NumRows: 2, data: []int64{1, 2}, }, - 2: Int16FieldData{ + 100: BoolFieldData{ NumRows: 2, - data: []int16{1, 2}, + data: []bool{true, false}, }, - 3: Int32FieldData{ - NumRows: 2, - data: []int32{1, 2}, - }, - 4: Int8FieldData{ + 101: Int8FieldData{ NumRows: 2, data: []int8{1, 2}, }, - 5: FloatFieldData{ + 102: Int16FieldData{ + NumRows: 2, + data: []int16{1, 2}, + }, + 103: Int32FieldData{ + NumRows: 2, + data: []int32{1, 2}, + }, + 104: Int64FieldData{ + NumRows: 2, + data: []int64{1, 2}, + }, + 105: FloatFieldData{ NumRows: 2, data: []float32{1, 2}, }, - 6: DoubleFieldData{ + 106: DoubleFieldData{ NumRows: 2, data: []float64{1, 2}, }, - 7: StringFieldData{ + 107: StringFieldData{ NumRows: 2, data: []string{"1", "2"}, }, - 8: BinaryVectorFieldData{ + 108: BinaryVectorFieldData{ NumRows: 8, data: []byte{0, 255, 0, 1, 0, 1, 0, 1}, dim: 8, }, - 9: FloatVectorFieldData{ + 109: FloatVectorFieldData{ NumRows: 1, data: []float32{0, 1, 2, 3, 4, 5, 6, 7}, dim: 8, }, }, } - blobs, err := insertCodec.Serialize(1, 1, 1, insertData) + blobs, err := insertCodec.Serialize(1, 1, insertData) assert.Nil(t, err) partitionID, segmentID, resultData, err := insertCodec.Deserialize(blobs) assert.Nil(t, err) assert.Equal(t, partitionID, int64(1)) assert.Equal(t, segmentID, int64(1)) - assert.Equal(t, insertData, resultData) + assert.Equal(t, resultData, insertData) assert.Nil(t, insertCodec.Close()) } -func TestDDCodecWriter(t *testing.T) { +func TestDDCodec(t *testing.T) { base := Base{ Version: 1, CommitID: 1, @@ -217,7 +238,7 @@ func TestDDCodecWriter(t *testing.T) { CreatePartitionEventType, DropPartitionEventType, } - blobs, err := dataDefinitionCodec.Serialize(1, ts, ddRequests, eventTypeCodes) + blobs, err := dataDefinitionCodec.Serialize(ts, ddRequests, eventTypeCodes) assert.Nil(t, err) resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs) assert.Nil(t, err) @@ -225,3 +246,29 @@ func TestDDCodecWriter(t *testing.T) { assert.Equal(t, resultRequests, ddRequests) assert.Nil(t, dataDefinitionCodec.Close()) } + +func TestIndexCodec(t *testing.T) { + indexCodec := &IndexCodec{ + Base{}, + } + blobs := []*Blob{ + { + "12345", + []byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7}, + }, + { + "6666", + []byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7}, + }, + { + "8885", + []byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7}, + }, + } + blobsInput, err := indexCodec.Serialize(blobs) + assert.Nil(t, err) + assert.Equal(t, blobs, blobsInput) + blobsOutput, err := indexCodec.Deserialize(blobs) + assert.Nil(t, err) + assert.Equal(t, blobsOutput, blobsInput) +} diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index 09c2969c7b..5a0d0671bb 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -22,6 +22,7 @@ type DescriptorEventDataFixPart struct { CollectionID int64 PartitionID int64 SegmentID int64 + FieldID int64 StartTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp PayloadDataType schemapb.DataType @@ -92,6 +93,12 @@ func (data *insertEventData) GetEventDataFixPartSize() int32 { } func (data *insertEventData) WriteEventData(buffer io.Writer) error { + if data.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if data.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } return binary.Write(buffer, binary.LittleEndian, data) } @@ -113,6 +120,12 @@ func (data *deleteEventData) GetEventDataFixPartSize() int32 { } func (data *deleteEventData) WriteEventData(buffer io.Writer) error { + if data.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if data.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } return binary.Write(buffer, binary.LittleEndian, data) } @@ -134,6 +147,12 @@ func (data *createCollectionEventData) GetEventDataFixPartSize() int32 { } func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error { + if data.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if data.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } return binary.Write(buffer, binary.LittleEndian, data) } @@ -155,6 +174,12 @@ func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 { } func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error { + if data.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if data.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } return binary.Write(buffer, binary.LittleEndian, data) } @@ -176,6 +201,12 @@ func (data *createPartitionEventData) GetEventDataFixPartSize() int32 { } func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error { + if data.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if data.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } return binary.Write(buffer, binary.LittleEndian, data) } @@ -197,6 +228,12 @@ func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 { } func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error { + if data.StartTimestamp == 0 { + return errors.New("hasn't set start time stamp") + } + if data.EndTimestamp == 0 { + return errors.New("hasn't set end time stamp") + } return binary.Write(buffer, binary.LittleEndian, data) } @@ -230,6 +267,7 @@ func newDescriptorEventData() (*descriptorEventData, error) { CollectionID: -1, PartitionID: -1, SegmentID: -1, + FieldID: -1, StartTimestamp: 0, EndTimestamp: 0, PayloadDataType: -1, diff --git a/internal/storage/event_test.go b/internal/storage/event_test.go index 60dcc5d603..967b3cb7d9 100644 --- a/internal/storage/event_test.go +++ b/internal/storage/event_test.go @@ -20,17 +20,17 @@ func checkEventHeader( length int32) { ts := UnsafeReadInt64(buf, 0) assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - assert.GreaterOrEqual(t, curts, ts) + curTs := time.Now().UnixNano() / int64(time.Millisecond) + curTs = int64(tsoutil.ComposeTS(curTs, 0)) + assert.GreaterOrEqual(t, curTs, ts) utc := UnsafeReadInt8(buf, int(unsafe.Sizeof(ts))) assert.Equal(t, EventTypeCode(utc), tc) - usid := UnsafeReadInt32(buf, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc))) - assert.Equal(t, usid, svrID) - elen := UnsafeReadInt32(buf, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usid))) + usID := UnsafeReadInt32(buf, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc))) + assert.Equal(t, usID, svrID) + elen := UnsafeReadInt32(buf, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usID))) assert.Equal(t, elen, length) - npos := UnsafeReadInt32(buf, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usid)+unsafe.Sizeof(elen))) - assert.Equal(t, npos, length) + nPos := UnsafeReadInt32(buf, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usID)+unsafe.Sizeof(elen))) + assert.Equal(t, nPos, length) } func TestDescriptorEvent(t *testing.T) { @@ -46,80 +46,92 @@ func TestDescriptorEvent(t *testing.T) { ts := UnsafeReadInt64(buffer, 0) assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - assert.GreaterOrEqual(t, curts, ts) + curTs := time.Now().UnixNano() / int64(time.Millisecond) + curTs = int64(tsoutil.ComposeTS(curTs, 0)) + assert.GreaterOrEqual(t, curTs, ts) utc := UnsafeReadInt8(buffer, int(unsafe.Sizeof(ts))) assert.Equal(t, EventTypeCode(utc), DescriptorEventType) - usid := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc))) - assert.Equal(t, usid, int32(ServerID)) - elen := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usid))) + usID := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc))) + assert.Equal(t, usID, int32(ServerID)) + elen := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usID))) assert.Equal(t, elen, int32(len(buffer))) - npos := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usid)+unsafe.Sizeof(elen))) - assert.GreaterOrEqual(t, npos, int32(binary.Size(MagicNumber)+len(buffer))) - t.Logf("next position = %d", npos) + nPos := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usID)+unsafe.Sizeof(elen))) + assert.GreaterOrEqual(t, nPos, int32(binary.Size(MagicNumber)+len(buffer))) + t.Logf("next position = %d", nPos) binVersion := UnsafeReadInt16(buffer, binary.Size(eventHeader{})) assert.Equal(t, binVersion, int16(BinlogVersion)) svrVersion := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion))) assert.Equal(t, svrVersion, int64(ServerVersion)) - cmitID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion))+int(unsafe.Sizeof(svrVersion))) - assert.Equal(t, cmitID, int64(CommitID)) + commitID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion))+int(unsafe.Sizeof(svrVersion))) + assert.Equal(t, commitID, int64(CommitID)) headLen := UnsafeReadInt8(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))) + int(unsafe.Sizeof(commitID))) assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) t.Logf("head len = %d", headLen) collID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))+ + int(unsafe.Sizeof(commitID))+ int(unsafe.Sizeof(headLen))) assert.Equal(t, collID, int64(-1)) partID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))+ + int(unsafe.Sizeof(commitID))+ int(unsafe.Sizeof(headLen))+ int(unsafe.Sizeof(collID))) assert.Equal(t, partID, int64(-1)) segID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))+ + int(unsafe.Sizeof(commitID))+ int(unsafe.Sizeof(headLen))+ int(unsafe.Sizeof(collID))+ int(unsafe.Sizeof(partID))) assert.Equal(t, segID, int64(-1)) - startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ + fieldID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))+ + int(unsafe.Sizeof(commitID))+ int(unsafe.Sizeof(headLen))+ int(unsafe.Sizeof(collID))+ int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))) - assert.Equal(t, startTs, int64(0)) - endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ + assert.Equal(t, fieldID, int64(-1)) + startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))+ + int(unsafe.Sizeof(commitID))+ int(unsafe.Sizeof(headLen))+ int(unsafe.Sizeof(collID))+ int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))+ + int(unsafe.Sizeof(fieldID))) + assert.Equal(t, startTs, int64(0)) + endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ + int(unsafe.Sizeof(binVersion))+ + int(unsafe.Sizeof(svrVersion))+ + int(unsafe.Sizeof(commitID))+ + int(unsafe.Sizeof(headLen))+ + int(unsafe.Sizeof(collID))+ + int(unsafe.Sizeof(partID))+ + int(unsafe.Sizeof(segID))+ + int(unsafe.Sizeof(fieldID))+ int(unsafe.Sizeof(startTs))) assert.Equal(t, endTs, int64(0)) colType := UnsafeReadInt32(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(binVersion))+ int(unsafe.Sizeof(svrVersion))+ - int(unsafe.Sizeof(cmitID))+ + int(unsafe.Sizeof(commitID))+ int(unsafe.Sizeof(headLen))+ int(unsafe.Sizeof(collID))+ int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))+ + int(unsafe.Sizeof(fieldID))+ int(unsafe.Sizeof(startTs))+ int(unsafe.Sizeof(endTs))) assert.Equal(t, colType, int32(-1)) @@ -127,11 +139,12 @@ func TestDescriptorEvent(t *testing.T) { postHeadOffset := binary.Size(eventHeader{}) + int(unsafe.Sizeof(binVersion)) + int(unsafe.Sizeof(svrVersion)) + - int(unsafe.Sizeof(cmitID)) + + int(unsafe.Sizeof(commitID)) + int(unsafe.Sizeof(headLen)) + int(unsafe.Sizeof(collID)) + int(unsafe.Sizeof(partID)) + int(unsafe.Sizeof(segID)) + + int(unsafe.Sizeof(fieldID)) + int(unsafe.Sizeof(startTs)) + int(unsafe.Sizeof(endTs)) + int(unsafe.Sizeof(colType)) @@ -182,9 +195,9 @@ func TestInsertEvent(t *testing.T) { pBuf := wBuf[payloadOffset:] pR, err := NewPayloadReader(dt, pBuf) assert.Nil(t, err) - vals, _, err := pR.GetDataFromPayload() + values, _, err := pR.GetDataFromPayload() assert.Nil(t, err) - assert.Equal(t, vals, ev) + assert.Equal(t, values, ev) err = pR.Close() assert.Nil(t, err) @@ -431,9 +444,9 @@ func TestDeleteEvent(t *testing.T) { pBuf := wBuf[payloadOffset:] pR, err := NewPayloadReader(dt, pBuf) assert.Nil(t, err) - vals, _, err := pR.GetDataFromPayload() + values, _, err := pR.GetDataFromPayload() assert.Nil(t, err) - assert.Equal(t, vals, ev) + assert.Equal(t, values, ev) err = pR.Close() assert.Nil(t, err) @@ -680,9 +693,9 @@ func TestCreateCollectionEvent(t *testing.T) { pBuf := wBuf[payloadOffset:] pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf) assert.Nil(t, err) - vals, _, err := pR.GetDataFromPayload() + values, _, err := pR.GetDataFromPayload() assert.Nil(t, err) - assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6}) + assert.Equal(t, values, []int64{1, 2, 3, 4, 5, 6}) err = pR.Close() assert.Nil(t, err) @@ -803,9 +816,9 @@ func TestDropCollectionEvent(t *testing.T) { pBuf := wBuf[payloadOffset:] pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf) assert.Nil(t, err) - vals, _, err := pR.GetDataFromPayload() + values, _, err := pR.GetDataFromPayload() assert.Nil(t, err) - assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6}) + assert.Equal(t, values, []int64{1, 2, 3, 4, 5, 6}) err = pR.Close() assert.Nil(t, err) @@ -926,9 +939,9 @@ func TestCreatePartitionEvent(t *testing.T) { pBuf := wBuf[payloadOffset:] pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf) assert.Nil(t, err) - vals, _, err := pR.GetDataFromPayload() + values, _, err := pR.GetDataFromPayload() assert.Nil(t, err) - assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6}) + assert.Equal(t, values, []int64{1, 2, 3, 4, 5, 6}) err = pR.Close() assert.Nil(t, err) @@ -1049,9 +1062,9 @@ func TestDropPartitionEvent(t *testing.T) { pBuf := wBuf[payloadOffset:] pR, err := NewPayloadReader(schemapb.DataType_INT64, pBuf) assert.Nil(t, err) - vals, _, err := pR.GetDataFromPayload() + values, _, err := pR.GetDataFromPayload() assert.Nil(t, err) - assert.Equal(t, vals, []int64{1, 2, 3, 4, 5, 6}) + assert.Equal(t, values, []int64{1, 2, 3, 4, 5, 6}) err = pR.Close() assert.Nil(t, err) diff --git a/internal/storage/event_writer_test.go b/internal/storage/event_writer_test.go index ceaf696df8..caf95918a5 100644 --- a/internal/storage/event_writer_test.go +++ b/internal/storage/event_writer_test.go @@ -55,6 +55,8 @@ func TestEventWriter(t *testing.T) { err = insertEvent.AddInt32ToPayload([]int32{1}) assert.NotNil(t, err) buffer := new(bytes.Buffer) + insertEvent.SetStartTimestamp(100) + insertEvent.SetEndTimestamp(200) err = insertEvent.Write(buffer) assert.Nil(t, err) length, err = insertEvent.GetMemoryUsageInBytes()