milvus/internal/storage/binlog_writer.go

339 lines
9.9 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package storage
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
// BinlogType is to distinguish different files saving different data.
type BinlogType int32
const (
// InsertBinlog BinlogType for insert data
InsertBinlog BinlogType = iota
// DeleteBinlog BinlogType for delete data
DeleteBinlog
// DDLBinlog BinlogType for DDL
DDLBinlog
// IndexFileBinlog BinlogType for index
IndexFileBinlog
)
const (
// MagicNumber used in binlog
MagicNumber int32 = 0xfffabc
)
type baseBinlogWriter struct {
descriptorEvent
magicNumber int32
binlogType BinlogType
eventWriters []EventWriter
buffer *bytes.Buffer
length int32
}
func (writer *baseBinlogWriter) isClosed() bool {
return writer.buffer != nil
}
// GetEventNums returns the number of event writers
func (writer *baseBinlogWriter) GetEventNums() int32 {
return int32(len(writer.eventWriters))
}
// GetRowNums returns writer's number of rows
func (writer *baseBinlogWriter) GetRowNums() (int32, error) {
if writer.isClosed() {
return writer.length, nil
}
length := 0
for _, e := range writer.eventWriters {
rows, err := e.GetPayloadLengthFromWriter()
if err != nil {
return 0, err
}
length += rows
}
return int32(length), nil
}
// GetBinlogType returns writer's binlogType
func (writer *baseBinlogWriter) GetBinlogType() BinlogType {
return writer.binlogType
}
// GetBuffer get binlog buffer. Return nil if binlog is not finished yet.
func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) {
if writer.buffer == nil {
return nil, fmt.Errorf("please close binlog before get buffer")
}
return writer.buffer.Bytes(), nil
}
// Close allocate buffer and release resource
func (writer *baseBinlogWriter) Finish() error {
if writer.buffer != nil {
return nil
}
if writer.StartTimestamp == 0 || writer.EndTimestamp == 0 {
return fmt.Errorf("invalid start/end timestamp")
}
var offset int32
writer.buffer = new(bytes.Buffer)
if err := binary.Write(writer.buffer, common.Endian, int32(MagicNumber)); err != nil {
return err
}
offset += int32(binary.Size(MagicNumber))
if err := writer.descriptorEvent.Write(writer.buffer); err != nil {
return err
}
offset += writer.descriptorEvent.GetMemoryUsageInBytes()
writer.length = 0
for _, w := range writer.eventWriters {
w.SetOffset(offset)
if err := w.Finish(); err != nil {
return err
}
if err := w.Write(writer.buffer); err != nil {
return err
}
length, err := w.GetMemoryUsageInBytes()
if err != nil {
return err
}
offset += length
rows, err := w.GetPayloadLengthFromWriter()
if err != nil {
return err
}
writer.length += int32(rows)
}
return nil
}
func (writer *baseBinlogWriter) Close() {
for _, e := range writer.eventWriters {
e.Close()
}
}
// InsertBinlogWriter is an object to write binlog file which saves insert data.
type InsertBinlogWriter struct {
baseBinlogWriter
}
// NextInsertEventWriter returns an event writer to write insert data to an event.
func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newInsertEventWriter(writer.PayloadDataType)
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// DeleteBinlogWriter is an object to write binlog file which saves delete data.
type DeleteBinlogWriter struct {
baseBinlogWriter
}
// NextDeleteEventWriter returns an event writer to write delete data to an event.
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newDeleteEventWriter(writer.PayloadDataType)
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// DDLBinlogWriter is an object to write binlog file which saves ddl information.
type DDLBinlogWriter struct {
baseBinlogWriter
}
// NextCreateCollectionEventWriter returns an event writer to write CreateCollection
// information to an event.
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newCreateCollectionEventWriter(writer.PayloadDataType)
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// NextDropCollectionEventWriter returns an event writer to write DropCollection
// information to an event.
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newDropCollectionEventWriter(writer.PayloadDataType)
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// NextCreatePartitionEventWriter returns an event writer to write CreatePartition
// information to an event.
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newCreatePartitionEventWriter(writer.PayloadDataType)
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// NextDropPartitionEventWriter returns an event writer to write DropPartition
// information to an event.
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newDropPartitionEventWriter(writer.PayloadDataType)
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// IndexFileBinlogWriter is an object to write binlog file which saves index files
type IndexFileBinlogWriter struct {
baseBinlogWriter
}
// NextIndexFileEventWriter return next available EventWriter
func (writer *IndexFileBinlogWriter) NextIndexFileEventWriter() (*indexFileEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newIndexFileEventWriter()
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// NewInsertBinlogWriter creates InsertBinlogWriter to write binlog file.
func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) *InsertBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
descriptorEvent.FieldID = FieldID
return &InsertBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
binlogType: InsertBinlog,
eventWriters: make([]EventWriter, 0),
buffer: nil,
},
}
}
// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
return &DeleteBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
binlogType: DeleteBinlog,
eventWriters: make([]EventWriter, 0),
buffer: nil,
},
}
}
// NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file.
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
return &DDLBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
binlogType: DDLBinlog,
eventWriters: make([]EventWriter, 0),
buffer: nil,
},
}
}
// NewIndexFileBinlogWriter returns a new IndexFileBinlogWriter with provided parameters
func NewIndexFileBinlogWriter(
indexBuildID UniqueID,
version int64,
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
fieldID UniqueID,
indexName string,
indexID UniqueID,
key string,
) *IndexFileBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
descriptorEvent.FieldID = fieldID
descriptorEvent.PayloadDataType = schemapb.DataType_Int8
descriptorEvent.AddExtra("indexBuildID", fmt.Sprintf("%d", indexBuildID))
descriptorEvent.AddExtra("version", fmt.Sprintf("%d", version))
descriptorEvent.AddExtra("indexName", indexName)
descriptorEvent.AddExtra("indexID", fmt.Sprintf("%d", indexID))
descriptorEvent.AddExtra("key", key)
return &IndexFileBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
binlogType: IndexFileBinlog,
eventWriters: make([]EventWriter, 0),
buffer: nil,
},
}
}