diff --git a/internal/datanode/iterators/binlog_iterator.go b/internal/datanode/iterators/binlog_iterator.go new file mode 100644 index 0000000000..884efaf14e --- /dev/null +++ b/internal/datanode/iterators/binlog_iterator.go @@ -0,0 +1,103 @@ +package iterator + +import ( + "sync" + + "go.uber.org/atomic" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type BinlogIterator struct { + disposed atomic.Bool + disposedCh chan struct{} + disposedOnce sync.Once + + data *storage.InsertData + label *Label + pkFieldID int64 + pkType schemapb.DataType + pos int +} + +var _ Iterator = (*BinlogIterator)(nil) + +// NewInsertBinlogIterator creates a new iterator +func NewInsertBinlogIterator(v [][]byte, pkFieldID typeutil.UniqueID, pkType schemapb.DataType, label *Label) (*BinlogIterator, error) { + blobs := make([]*storage.Blob, len(v)) + for i := range blobs { + blobs[i] = &storage.Blob{Value: v[i]} + } + + reader := storage.NewInsertCodec() + _, _, iData, err := reader.Deserialize(blobs) + if err != nil { + return nil, err + } + + return &BinlogIterator{ + disposedCh: make(chan struct{}), + data: iData, + pkFieldID: pkFieldID, + pkType: pkType, + label: label, + }, nil +} + +// HasNext returns true if the iterator have unread record +func (i *BinlogIterator) HasNext() bool { + return !i.isDisposed() && i.hasNext() +} + +func (i *BinlogIterator) Next() (*LabeledRowData, error) { + if i.isDisposed() { + return nil, ErrDisposed + } + + if !i.hasNext() { + return nil, ErrNoMoreRecord + } + + fields := make(map[int64]interface{}) + for fieldID, fieldData := range i.data.Data { + fields[fieldID] = fieldData.GetRow(i.pos) + } + + pk, err := storage.GenPrimaryKeyByRawData(i.data.Data[i.pkFieldID].GetRow(i.pos), i.pkType) + if err != nil { + return nil, err + } + + row := &InsertRow{ + ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64), + Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)), + PK: pk, + Value: fields, + } + i.pos++ + return NewLabeledRowData(row, i.label), nil +} + +// Dispose disposes the iterator +func (i *BinlogIterator) Dispose() { + i.disposed.CompareAndSwap(false, true) + i.disposedOnce.Do(func() { + close(i.disposedCh) + }) +} + +func (i *BinlogIterator) hasNext() bool { + return i.pos < i.data.GetRowNum() +} + +func (i *BinlogIterator) isDisposed() bool { + return i.disposed.Load() +} + +// Disposed wait forever for the iterator to dispose +func (i *BinlogIterator) WaitForDisposed() { + <-i.disposedCh +} diff --git a/internal/datanode/iterators/binlog_iterator_test.go b/internal/datanode/iterators/binlog_iterator_test.go new file mode 100644 index 0000000000..e9e95ba442 --- /dev/null +++ b/internal/datanode/iterators/binlog_iterator_test.go @@ -0,0 +1,305 @@ +package iterator + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/storage" +) + +func TestInsertBinlogIteratorSuite(t *testing.T) { + suite.Run(t, new(InsertBinlogIteratorSuite)) +} + +const ( + CollectionID = 10000 + PartitionID = 10001 + SegmentID = 10002 + RowIDField = 0 + TimestampField = 1 + BoolField = 100 + Int8Field = 101 + Int16Field = 102 + Int32Field = 103 + Int64Field = 104 + FloatField = 105 + DoubleField = 106 + StringField = 107 + BinaryVectorField = 108 + FloatVectorField = 109 + ArrayField = 110 + JSONField = 111 + Float16VectorField = 112 +) + +type InsertBinlogIteratorSuite struct { + suite.Suite + + i *BinlogIterator +} + +func (s *InsertBinlogIteratorSuite) TestBinlogIterator() { + insertData, meta := genTestInsertData() + writer := storage.NewInsertCodecWithSchema(meta) + blobs, err := writer.Serialize(PartitionID, SegmentID, insertData) + s.Require().NoError(err) + + values := [][]byte{} + for _, b := range blobs { + values = append(values, b.Value[:]) + } + s.Run("invalid blobs", func() { + iter, err := NewInsertBinlogIterator([][]byte{}, Int64Field, schemapb.DataType_Int64, nil) + s.Error(err) + s.Nil(iter) + }) + + s.Run("invalid pk type", func() { + iter, err := NewInsertBinlogIterator(values, Int64Field, schemapb.DataType_Float, &Label{segmentID: 19530}) + s.NoError(err) + + _, err = iter.Next() + s.Error(err) + }) + + s.Run("normal", func() { + iter, err := NewInsertBinlogIterator(values, Int64Field, schemapb.DataType_Int64, &Label{segmentID: 19530}) + s.NoError(err) + + rows := []interface{}{} + var idx int = 0 // row number + + for iter.HasNext() { + labeled, err := iter.Next() + s.NoError(err) + s.Equal(int64(19530), labeled.GetSegmentID()) + + rows = append(rows, labeled.data) + + insertRow, ok := labeled.data.(*InsertRow) + s.True(ok) + + s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.PK.GetValue().(int64)) + s.Equal(insertData.Data[RowIDField].GetRow(idx).(int64), insertRow.ID) + s.Equal(insertData.Data[BoolField].GetRow(idx).(bool), insertRow.Value[BoolField].(bool)) + s.Equal(insertData.Data[Int8Field].GetRow(idx).(int8), insertRow.Value[Int8Field].(int8)) + s.Equal(insertData.Data[Int16Field].GetRow(idx).(int16), insertRow.Value[Int16Field].(int16)) + s.Equal(insertData.Data[Int32Field].GetRow(idx).(int32), insertRow.Value[Int32Field].(int32)) + s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.Value[Int64Field].(int64)) + s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.Value[Int64Field].(int64)) + s.Equal(insertData.Data[FloatField].GetRow(idx).(float32), insertRow.Value[FloatField].(float32)) + s.Equal(insertData.Data[DoubleField].GetRow(idx).(float64), insertRow.Value[DoubleField].(float64)) + s.Equal(insertData.Data[StringField].GetRow(idx).(string), insertRow.Value[StringField].(string)) + s.Equal(insertData.Data[ArrayField].GetRow(idx).(*schemapb.ScalarField).GetIntData().Data, insertRow.Value[ArrayField].(*schemapb.ScalarField).GetIntData().Data) + s.Equal(insertData.Data[JSONField].GetRow(idx).([]byte), insertRow.Value[JSONField].([]byte)) + s.Equal(insertData.Data[BinaryVectorField].GetRow(idx).([]byte), insertRow.Value[BinaryVectorField].([]byte)) + s.Equal(insertData.Data[FloatVectorField].GetRow(idx).([]float32), insertRow.Value[FloatVectorField].([]float32)) + s.Equal(insertData.Data[Float16VectorField].GetRow(idx).([]byte), insertRow.Value[Float16VectorField].([]byte)) + + idx++ + } + + s.Equal(2, len(rows)) + + _, err = iter.Next() + s.ErrorIs(err, ErrNoMoreRecord) + + iter.Dispose() + iter.WaitForDisposed() + + _, err = iter.Next() + s.ErrorIs(err, ErrDisposed) + }) +} + +func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) { + meta := &etcdpb.CollectionMeta{ + ID: CollectionID, + CreateTime: 1, + SegmentIDs: []int64{SegmentID}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: RowIDField, + Name: "row_id", + IsPrimaryKey: false, + Description: "row_id", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: TimestampField, + Name: "Timestamp", + IsPrimaryKey: false, + Description: "Timestamp", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: BoolField, + Name: "field_bool", + IsPrimaryKey: false, + Description: "bool", + DataType: schemapb.DataType_Bool, + }, + { + FieldID: Int8Field, + Name: "field_int8", + IsPrimaryKey: false, + Description: "int8", + DataType: schemapb.DataType_Int8, + }, + { + FieldID: Int16Field, + Name: "field_int16", + IsPrimaryKey: false, + Description: "int16", + DataType: schemapb.DataType_Int16, + }, + { + FieldID: Int32Field, + Name: "field_int32", + IsPrimaryKey: false, + Description: "int32", + DataType: schemapb.DataType_Int32, + }, + { + FieldID: Int64Field, + Name: "field_int64", + IsPrimaryKey: true, + Description: "int64", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: FloatField, + Name: "field_float", + IsPrimaryKey: false, + Description: "float", + DataType: schemapb.DataType_Float, + }, + { + FieldID: DoubleField, + Name: "field_double", + IsPrimaryKey: false, + Description: "double", + DataType: schemapb.DataType_Double, + }, + { + FieldID: StringField, + Name: "field_string", + IsPrimaryKey: false, + Description: "string", + DataType: schemapb.DataType_String, + }, + { + FieldID: ArrayField, + Name: "field_int32_array", + Description: "int32 array", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + }, + { + FieldID: JSONField, + Name: "field_json", + Description: "json", + DataType: schemapb.DataType_JSON, + }, + { + FieldID: BinaryVectorField, + Name: "field_binary_vector", + IsPrimaryKey: false, + Description: "binary_vector", + DataType: schemapb.DataType_BinaryVector, + }, + { + FieldID: FloatVectorField, + Name: "field_float_vector", + IsPrimaryKey: false, + Description: "float_vector", + DataType: schemapb.DataType_FloatVector, + }, + { + FieldID: Float16VectorField, + Name: "field_float16_vector", + IsPrimaryKey: false, + Description: "float16_vector", + DataType: schemapb.DataType_Float16Vector, + }, + }, + }, + } + insertData := storage.InsertData{ + Data: map[int64]storage.FieldData{ + RowIDField: &storage.Int64FieldData{ + Data: []int64{3, 4}, + }, + TimestampField: &storage.Int64FieldData{ + Data: []int64{3, 4}, + }, + BoolField: &storage.BoolFieldData{ + Data: []bool{true, false}, + }, + Int8Field: &storage.Int8FieldData{ + Data: []int8{3, 4}, + }, + Int16Field: &storage.Int16FieldData{ + Data: []int16{3, 4}, + }, + Int32Field: &storage.Int32FieldData{ + Data: []int32{3, 4}, + }, + Int64Field: &storage.Int64FieldData{ + Data: []int64{3, 4}, + }, + FloatField: &storage.FloatFieldData{ + Data: []float32{3, 4}, + }, + DoubleField: &storage.DoubleFieldData{ + Data: []float64{3, 4}, + }, + StringField: &storage.StringFieldData{ + Data: []string{"3", "4"}, + }, + BinaryVectorField: &storage.BinaryVectorFieldData{ + Data: []byte{0, 255}, + Dim: 8, + }, + FloatVectorField: &storage.FloatVectorFieldData{ + Data: []float32{4, 5, 6, 7, 4, 5, 6, 7}, + Dim: 4, + }, + ArrayField: &storage.ArrayFieldData{ + ElementType: schemapb.DataType_Int32, + Data: []*schemapb.ScalarField{ + { + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{3, 2, 1}}, + }, + }, + { + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{6, 5, 4}}, + }, + }, + }, + }, + JSONField: &storage.JSONFieldData{ + Data: [][]byte{ + []byte(`{"batch":2}`), + []byte(`{"key":"world"}`), + }, + }, + Float16VectorField: &storage.Float16VectorFieldData{ + Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, + Dim: 4, + }, + }, + } + + return &insertData, meta +} diff --git a/internal/datanode/iterators/deltalog_iterator.go b/internal/datanode/iterators/deltalog_iterator.go new file mode 100644 index 0000000000..3d63ee1b2c --- /dev/null +++ b/internal/datanode/iterators/deltalog_iterator.go @@ -0,0 +1,80 @@ +package iterator + +import ( + "sync" + + "go.uber.org/atomic" + + "github.com/milvus-io/milvus/internal/storage" +) + +var _ Iterator = (*DeltalogIterator)(nil) + +type DeltalogIterator struct { + disposeCh chan struct{} + disposedOnce sync.Once + disposed atomic.Bool + + data *storage.DeleteData + label *Label + pos int +} + +func NewDeltalogIterator(v [][]byte, label *Label) (*DeltalogIterator, error) { + blobs := make([]*storage.Blob, len(v)) + for i := range blobs { + blobs[i] = &storage.Blob{Value: v[i]} + } + + reader := storage.NewDeleteCodec() + _, _, dData, err := reader.Deserialize(blobs) + if err != nil { + return nil, err + } + return &DeltalogIterator{ + disposeCh: make(chan struct{}), + data: dData, + label: label, + }, nil +} + +func (d *DeltalogIterator) HasNext() bool { + return !d.isDisposed() && d.hasNext() +} + +func (d *DeltalogIterator) Next() (*LabeledRowData, error) { + if d.isDisposed() { + return nil, ErrDisposed + } + + if !d.hasNext() { + return nil, ErrNoMoreRecord + } + + row := &DeltalogRow{ + Pk: d.data.Pks[d.pos], + Timestamp: d.data.Tss[d.pos], + } + d.pos++ + + return NewLabeledRowData(row, d.label), nil +} + +func (d *DeltalogIterator) Dispose() { + d.disposed.CompareAndSwap(false, true) + d.disposedOnce.Do(func() { + close(d.disposeCh) + }) +} + +func (d *DeltalogIterator) hasNext() bool { + return int64(d.pos) < d.data.RowCount +} + +func (d *DeltalogIterator) isDisposed() bool { + return d.disposed.Load() +} + +func (d *DeltalogIterator) WaitForDisposed() { + <-d.disposeCh +} diff --git a/internal/datanode/iterators/deltalog_iterator_test.go b/internal/datanode/iterators/deltalog_iterator_test.go new file mode 100644 index 0000000000..930b3f0f17 --- /dev/null +++ b/internal/datanode/iterators/deltalog_iterator_test.go @@ -0,0 +1,61 @@ +package iterator + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/storage" +) + +func TestDeltalogIteratorSuite(t *testing.T) { + suite.Run(t, new(DeltalogIteratorSuite)) +} + +type DeltalogIteratorSuite struct { + suite.Suite +} + +func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() { + testpks := []int64{1, 2, 3, 4} + testtss := []uint64{43757345, 43757346, 43757347, 43757348} + + dData := &storage.DeleteData{} + for i := range testpks { + dData.Append(storage.NewInt64PrimaryKey(testpks[i]), testtss[i]) + } + + dCodec := storage.NewDeleteCodec() + blob, err := dCodec.Serialize(CollectionID, 1, 1, dData) + s.Require().NoError(err) + value := [][]byte{blob.Value[:]} + + iter, err := NewDeltalogIterator(value, &Label{segmentID: 100}) + s.NoError(err) + + var ( + gotpks = []int64{} + gottss = []uint64{} + ) + + for iter.HasNext() { + labeled, err := iter.Next() + s.NoError(err) + + s.Equal(labeled.GetSegmentID(), int64(100)) + gotpks = append(gotpks, labeled.data.(*DeltalogRow).Pk.GetValue().(int64)) + gottss = append(gottss, labeled.data.(*DeltalogRow).Timestamp) + } + + s.ElementsMatch(gotpks, testpks) + s.ElementsMatch(gottss, testtss) + + _, err = iter.Next() + s.ErrorIs(err, ErrNoMoreRecord) + + iter.Dispose() + iter.WaitForDisposed() + + _, err = iter.Next() + s.ErrorIs(err, ErrDisposed) +} diff --git a/internal/datanode/iterators/iterator.go b/internal/datanode/iterators/iterator.go new file mode 100644 index 0000000000..d7d9c26c3c --- /dev/null +++ b/internal/datanode/iterators/iterator.go @@ -0,0 +1,62 @@ +package iterator + +import ( + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var ( + // ErrNoMoreRecord is the error that the iterator does not have next record. + ErrNoMoreRecord = errors.New("no more record") + // ErrDisposed is the error that the iterator is disposed. + ErrDisposed = errors.New("iterator is disposed") +) + +const InvalidID int64 = -1 + +type Row interface{} + +type InsertRow struct { + ID int64 + PK storage.PrimaryKey + Timestamp typeutil.Timestamp + Value map[storage.FieldID]interface{} +} + +type DeltalogRow struct { + Pk storage.PrimaryKey + Timestamp typeutil.Timestamp +} + +type Label struct { + segmentID typeutil.UniqueID +} + +type LabeledRowData struct { + label *Label + data Row +} + +func (l *LabeledRowData) GetSegmentID() typeutil.UniqueID { + if l.label == nil { + return InvalidID + } + + return l.label.segmentID +} + +func NewLabeledRowData(data Row, label *Label) *LabeledRowData { + return &LabeledRowData{ + label: label, + data: data, + } +} + +type Iterator interface { + HasNext() bool + Next() (*LabeledRowData, error) + Dispose() + WaitForDisposed() // wait until the iterator is disposed +} diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index e3000f2919..d05068255b 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -243,7 +243,8 @@ func TestInsertCodec(t *testing.T) { }, }, Float16VectorField: &Float16VectorFieldData{ - Data: []byte{0, 255, 0, 255, 0, 255, 0, 255}, + // length = 2 * Dim * numRows(2) = 16 + Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, Dim: 4, }, }, @@ -290,7 +291,8 @@ func TestInsertCodec(t *testing.T) { Dim: 4, }, Float16VectorField: &Float16VectorFieldData{ - Data: []byte{0, 255, 0, 255, 0, 255, 0, 255}, + // length = 2 * Dim * numRows(2) = 16 + Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, Dim: 4, }, ArrayField: &ArrayFieldData{ @@ -374,7 +376,12 @@ func TestInsertCodec(t *testing.T) { assert.Equal(t, []string{"1", "2", "3", "4"}, resultData.Data[StringField].(*StringFieldData).Data) assert.Equal(t, []byte{0, 255, 0, 255}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).Data) assert.Equal(t, []float32{0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 4, 5, 6, 7}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).Data) - assert.Equal(t, []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, resultData.Data[Float16VectorField].(*Float16VectorFieldData).Data) + assert.Equal(t, []byte{ + 0, 255, 0, 255, 0, 255, 0, 255, + 0, 255, 0, 255, 0, 255, 0, 255, + 0, 255, 0, 255, 0, 255, 0, 255, + 0, 255, 0, 255, 0, 255, 0, 255, + }, resultData.Data[Float16VectorField].(*Float16VectorFieldData).Data) int32ArrayList := [][]int32{{1, 2, 3}, {4, 5, 6}, {3, 2, 1}, {6, 5, 4}} resultArrayList := [][]int32{}