Import native binlog files (#19447)

Signed-off-by: yhmo <yihua.mo@zilliz.com>

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2022-09-30 10:32:54 +08:00 committed by GitHub
parent d79f88c5f6
commit 2599a3ece0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 4532 additions and 58 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,526 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"context"
"errors"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"go.uber.org/zap"
)
// This class is a wrapper of storage.BinlogReader, to read binlog file, block by block.
// Note: for bulkoad function, we only handle normal insert log and delta log.
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// Typically, an insert log file size is 16MB.
type BinlogFile struct {
chunkManager storage.ChunkManager // storage interfaces to read binlog files
reader *storage.BinlogReader // binlog reader
}
func NewBinlogFile(chunkManager storage.ChunkManager) (*BinlogFile, error) {
if chunkManager == nil {
log.Error("Binlog file: chunk manager pointer is nil")
return nil, errors.New("chunk manager pointer is nil")
}
binlogFile := &BinlogFile{
chunkManager: chunkManager,
}
return binlogFile, nil
}
func (p *BinlogFile) Open(filePath string) error {
p.Close()
if len(filePath) == 0 {
log.Error("Binlog file: binlog path is empty")
return errors.New("binlog path is empty")
}
// TODO add context
bytes, err := p.chunkManager.Read(context.TODO(), filePath)
if err != nil {
log.Error("Binlog file: failed to open binlog", zap.String("filePath", filePath), zap.Error(err))
return err
}
p.reader, err = storage.NewBinlogReader(bytes)
if err != nil {
log.Error("Binlog file: failed to initialize binlog reader", zap.String("filePath", filePath), zap.Error(err))
return err
}
log.Info("Binlog file: open binlog successfully", zap.String("filePath", filePath))
return nil
}
// The outer caller must call this method in defer
func (p *BinlogFile) Close() {
if p.reader != nil {
p.reader.Close()
p.reader = nil
}
}
func (p *BinlogFile) DataType() schemapb.DataType {
if p.reader == nil {
return schemapb.DataType_None
}
return p.reader.PayloadDataType
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadBool() ([]bool, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]bool, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Bool {
log.Error("Binlog file: binlog data type is not bool")
return nil, errors.New("binlog data type is not bool")
}
data, err := event.PayloadReaderInterface.GetBoolFromPayload()
if err != nil {
log.Error("Binlog file: failed to read bool data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadInt8() ([]int8, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]int8, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int8 {
log.Error("Binlog file: binlog data type is not int8")
return nil, errors.New("binlog data type is not int8")
}
data, err := event.PayloadReaderInterface.GetInt8FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int8 data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadInt16() ([]int16, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]int16, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int16 {
log.Error("Binlog file: binlog data type is not int16")
return nil, errors.New("binlog data type is not int16")
}
data, err := event.PayloadReaderInterface.GetInt16FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int16 data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadInt32() ([]int32, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]int32, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int32 {
log.Error("Binlog file: binlog data type is not int32")
return nil, errors.New("binlog data type is not int32")
}
data, err := event.PayloadReaderInterface.GetInt32FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int32 data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadInt64() ([]int64, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]int64, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int64 {
log.Error("Binlog file: binlog data type is not int64")
return nil, errors.New("binlog data type is not int64")
}
data, err := event.PayloadReaderInterface.GetInt64FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int64 data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadFloat() ([]float32, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]float32, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Float {
log.Error("Binlog file: binlog data type is not float")
return nil, errors.New("binlog data type is not float")
}
data, err := event.PayloadReaderInterface.GetFloatFromPayload()
if err != nil {
log.Error("Binlog file: failed to read float data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadDouble() ([]float64, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]float64, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Double {
log.Error("Binlog file: binlog data type is not double")
return nil, errors.New("binlog data type is not double")
}
data, err := event.PayloadReaderInterface.GetDoubleFromPayload()
if err != nil {
log.Error("Binlog file: failed to read double data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
func (p *BinlogFile) ReadVarchar() ([]string, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
result := make([]string, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
}
// end of the file
if event == nil {
break
}
// special case: delete event data type is varchar
if event.TypeCode != storage.InsertEventType && event.TypeCode != storage.DeleteEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if (p.DataType() != schemapb.DataType_VarChar) && (p.DataType() != schemapb.DataType_String) {
log.Error("Binlog file: binlog data type is not varchar")
return nil, errors.New("binlog data type is not varchar")
}
data, err := event.PayloadReaderInterface.GetStringFromPayload()
if err != nil {
log.Error("Binlog file: failed to read varchar data", zap.Error(err))
return nil, err
}
result = append(result, data...)
}
return result, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
// return vectors data and the dimension
func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, 0, errors.New("binlog reader not yet initialized")
}
dim := 0
result := make([]byte, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, 0, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, 0, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_BinaryVector {
log.Error("Binlog file: binlog data type is not binary vector")
return nil, 0, errors.New("binlog data type is not binary vector")
}
data, dimenson, err := event.PayloadReaderInterface.GetBinaryVectorFromPayload()
if err != nil {
log.Error("Binlog file: failed to read binary vector data", zap.Error(err))
return nil, 0, err
}
dim = dimenson
result = append(result, data...)
}
return result, dim, nil
}
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
// This method read all the blocks of a binlog by a data type.
// return vectors data and the dimension
func (p *BinlogFile) ReadFloatVector() ([]float32, int, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
return nil, 0, errors.New("binlog reader not yet initialized")
}
dim := 0
result := make([]float32, 0)
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, 0, err
}
// end of the file
if event == nil {
break
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
return nil, 0, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_FloatVector {
log.Error("Binlog file: binlog data type is not float vector")
return nil, 0, errors.New("binlog data type is not float vector")
}
data, dimension, err := event.PayloadReaderInterface.GetFloatVectorFromPayload()
if err != nil {
log.Error("Binlog file: failed to read float vector data", zap.Error(err))
return nil, 0, err
}
dim = dimension
result = append(result, data...)
}
return result, dim, nil
}

View File

@ -0,0 +1,761 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"encoding/binary"
"errors"
"fmt"
"testing"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
)
func createBinlogBuf(t *testing.T, dataType schemapb.DataType, data interface{}) []byte {
w := storage.NewInsertBinlogWriter(dataType, 10, 20, 30, 40)
assert.NotNil(t, w)
defer w.Close()
dim := 0
if dataType == schemapb.DataType_BinaryVector {
vectors := data.([][]byte)
if len(vectors) > 0 {
dim = len(vectors[0]) * 8
}
} else if dataType == schemapb.DataType_FloatVector {
vectors := data.([][]float32)
if len(vectors) > 0 {
dim = len(vectors[0])
}
}
evt, err := w.NextInsertEventWriter(dim)
assert.Nil(t, err)
assert.NotNil(t, evt)
evt.SetEventTimestamp(100, 200)
w.SetEventTimeStamp(1000, 2000)
switch dataType {
case schemapb.DataType_Bool:
err = evt.AddBoolToPayload(data.([]bool))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]bool))
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_Int8:
err = evt.AddInt8ToPayload(data.([]int8))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]int8))
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_Int16:
err = evt.AddInt16ToPayload(data.([]int16))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]int16)) * 2
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_Int32:
err = evt.AddInt32ToPayload(data.([]int32))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]int32)) * 4
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_Int64:
err = evt.AddInt64ToPayload(data.([]int64))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]int64)) * 8
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_Float:
err = evt.AddFloatToPayload(data.([]float32))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]float32)) * 4
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_Double:
err = evt.AddDoubleToPayload(data.([]float64))
assert.Nil(t, err)
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(data.([]float64)) * 8
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_VarChar:
values := data.([]string)
sizeTotal := 0
for _, val := range values {
err = evt.AddOneStringToPayload(val)
assert.Nil(t, err)
sizeTotal += binary.Size(val)
}
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_BinaryVector:
vectors := data.([][]byte)
for i := 0; i < len(vectors); i++ {
err = evt.AddBinaryVectorToPayload(vectors[i], dim)
assert.Nil(t, err)
}
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(vectors) * dim / 8
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
case schemapb.DataType_FloatVector:
vectors := data.([][]float32)
for i := 0; i < len(vectors); i++ {
err = evt.AddFloatVectorToPayload(vectors[i], dim)
assert.Nil(t, err)
}
// without the two lines, the case will crash at here.
// the "original_size" is come from storage.originalSizeKey
sizeTotal := len(vectors) * dim * 4
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
default:
assert.True(t, false)
return nil
}
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
assert.NotNil(t, buf)
return buf
}
func Test_NewBinlogFile(t *testing.T) {
// nil chunkManager
file, err := NewBinlogFile(nil)
assert.NotNil(t, err)
assert.Nil(t, file)
// succeed
file, err = NewBinlogFile(&MockChunkManager{})
assert.Nil(t, err)
assert.NotNil(t, file)
}
func Test_BinlogFileOpen(t *testing.T) {
chunkManager := &MockChunkManager{
readBuf: nil,
readErr: nil,
}
// read succeed
chunkManager.readBuf = map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Bool, []bool{true}),
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.NotNil(t, binlogFile.reader)
dt := binlogFile.DataType()
assert.Equal(t, schemapb.DataType_Bool, dt)
// failed to read
err = binlogFile.Open("")
assert.NotNil(t, err)
chunkManager.readErr = errors.New("error")
err = binlogFile.Open("dummy")
assert.NotNil(t, err)
// failed to create new BinlogReader
chunkManager.readBuf["dummy"] = []byte{}
chunkManager.readErr = nil
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.NotNil(t, err)
assert.Nil(t, binlogFile.reader)
dt = binlogFile.DataType()
assert.Equal(t, schemapb.DataType_None, dt)
// nil reader protect
dataBool, err := binlogFile.ReadBool()
assert.Nil(t, dataBool)
assert.NotNil(t, err)
dataInt8, err := binlogFile.ReadInt8()
assert.Nil(t, dataInt8)
assert.NotNil(t, err)
dataInt16, err := binlogFile.ReadInt16()
assert.Nil(t, dataInt16)
assert.NotNil(t, err)
dataInt32, err := binlogFile.ReadInt32()
assert.Nil(t, dataInt32)
assert.NotNil(t, err)
dataInt64, err := binlogFile.ReadInt64()
assert.Nil(t, dataInt64)
assert.NotNil(t, err)
dataFloat, err := binlogFile.ReadFloat()
assert.Nil(t, dataFloat)
assert.NotNil(t, err)
dataDouble, err := binlogFile.ReadDouble()
assert.Nil(t, dataDouble)
assert.NotNil(t, err)
dataVarchar, err := binlogFile.ReadVarchar()
assert.Nil(t, dataVarchar)
assert.NotNil(t, err)
dataBinaryVector, dim, err := binlogFile.ReadBinaryVector()
assert.Nil(t, dataBinaryVector)
assert.Equal(t, 0, dim)
assert.NotNil(t, err)
dataFloatVector, dim, err := binlogFile.ReadFloatVector()
assert.Nil(t, dataFloatVector)
assert.Equal(t, 0, dim)
assert.NotNil(t, err)
}
func Test_BinlogFileBool(t *testing.T) {
source := []bool{true, false, true, false}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Bool, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Bool, binlogFile.DataType())
data, err := binlogFile.ReadBool()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadInt8()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadBool()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileInt8(t *testing.T) {
source := []int8{2, 4, 6, 8}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Int8, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Int8, binlogFile.DataType())
data, err := binlogFile.ReadInt8()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadInt16()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadInt8()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileInt16(t *testing.T) {
source := []int16{2, 4, 6, 8}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Int16, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Int16, binlogFile.DataType())
data, err := binlogFile.ReadInt16()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadInt32()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadInt16()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileInt32(t *testing.T) {
source := []int32{2, 4, 6, 8}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Int32, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Int32, binlogFile.DataType())
data, err := binlogFile.ReadInt32()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadInt64()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadInt32()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileInt64(t *testing.T) {
source := []int64{2, 4, 6, 8}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Int64, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Int64, binlogFile.DataType())
data, err := binlogFile.ReadInt64()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadFloat()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadInt64()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileFloat(t *testing.T) {
source := []float32{2, 4, 6, 8}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Float, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Float, binlogFile.DataType())
data, err := binlogFile.ReadFloat()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadDouble()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadFloat()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileDouble(t *testing.T) {
source := []float64{2, 4, 6, 8}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_Double, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_Double, binlogFile.DataType())
data, err := binlogFile.ReadDouble()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, err := binlogFile.ReadVarchar()
assert.Zero(t, len(d))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, err = binlogFile.ReadDouble()
assert.Zero(t, len(data))
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileVarchar(t *testing.T) {
source := []string{"a", "b", "c", "d"}
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_VarChar, source),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_VarChar, binlogFile.DataType())
data, err := binlogFile.ReadVarchar()
assert.Nil(t, err)
assert.NotNil(t, data)
assert.Equal(t, len(source), len(data))
for i := 0; i < len(source); i++ {
assert.Equal(t, source[i], data[i])
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
d, dim, err := binlogFile.ReadBinaryVector()
assert.Zero(t, len(d))
assert.Zero(t, dim)
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileBinaryVector(t *testing.T) {
vectors := make([][]byte, 0)
vectors = append(vectors, []byte{1, 3, 5, 7})
vectors = append(vectors, []byte{2, 4, 6, 8})
dim := len(vectors[0]) * 8
vecCount := len(vectors)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_BinaryVector, vectors),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_BinaryVector, binlogFile.DataType())
data, d, err := binlogFile.ReadBinaryVector()
assert.Nil(t, err)
assert.Equal(t, dim, d)
assert.NotNil(t, data)
assert.Equal(t, vecCount*dim/8, len(data))
for i := 0; i < vecCount; i++ {
for j := 0; j < dim/8; j++ {
assert.Equal(t, vectors[i][j], data[i*dim/8+j])
}
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
dt, d, err := binlogFile.ReadFloatVector()
assert.Zero(t, len(dt))
assert.Zero(t, d)
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, d, err = binlogFile.ReadBinaryVector()
assert.Zero(t, len(data))
assert.Zero(t, d)
assert.NotNil(t, err)
binlogFile.Close()
}
func Test_BinlogFileFloatVector(t *testing.T) {
vectors := make([][]float32, 0)
vectors = append(vectors, []float32{1, 3, 5, 7})
vectors = append(vectors, []float32{2, 4, 6, 8})
dim := len(vectors[0])
vecCount := len(vectors)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": createBinlogBuf(t, schemapb.DataType_FloatVector, vectors),
},
}
binlogFile, err := NewBinlogFile(chunkManager)
assert.Nil(t, err)
assert.NotNil(t, binlogFile)
// correct reading
err = binlogFile.Open("dummy")
assert.Nil(t, err)
assert.Equal(t, schemapb.DataType_FloatVector, binlogFile.DataType())
data, d, err := binlogFile.ReadFloatVector()
assert.Nil(t, err)
assert.Equal(t, dim, d)
assert.NotNil(t, data)
assert.Equal(t, vecCount*dim, len(data))
for i := 0; i < vecCount; i++ {
for j := 0; j < dim; j++ {
assert.Equal(t, vectors[i][j], data[i*dim+j])
}
}
binlogFile.Close()
// wrong data type reading
binlogFile, err = NewBinlogFile(chunkManager)
assert.Nil(t, err)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
dt, err := binlogFile.ReadBool()
assert.Zero(t, len(dt))
assert.NotNil(t, err)
binlogFile.Close()
// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
err = binlogFile.Open("dummy")
assert.Nil(t, err)
data, d, err = binlogFile.ReadFloatVector()
assert.Zero(t, len(data))
assert.Zero(t, d)
assert.NotNil(t, err)
binlogFile.Close()
}

View File

@ -0,0 +1,238 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"context"
"errors"
"path"
"sort"
"strconv"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"go.uber.org/zap"
)
type BinlogParser struct {
collectionSchema *schemapb.CollectionSchema // collection schema
shardNum int32 // sharding number of the collection
segmentSize int64 // maximum size of a segment(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
callFlushFunc ImportFlushFunc // call back function to flush segment
// a timestamp to define the end point of restore, data after this point will be ignored
// set this value to 0, all the data will be ignored
// set this value to math.MaxUint64, all the data will be imported
tsEndPoint uint64
}
func NewBinlogParser(collectionSchema *schemapb.CollectionSchema,
shardNum int32,
segmentSize int64,
chunkManager storage.ChunkManager,
flushFunc ImportFlushFunc,
tsEndPoint uint64) (*BinlogParser, error) {
if collectionSchema == nil {
log.Error("Binlog parser: collection schema is nil")
return nil, errors.New("collection schema is nil")
}
if chunkManager == nil {
log.Error("Binlog parser: chunk manager pointer is nil")
return nil, errors.New("chunk manager pointer is nil")
}
if flushFunc == nil {
log.Error("Binlog parser: flush function is nil")
return nil, errors.New("flush function is nil")
}
v := &BinlogParser{
collectionSchema: collectionSchema,
shardNum: shardNum,
segmentSize: segmentSize,
chunkManager: chunkManager,
callFlushFunc: flushFunc,
tsEndPoint: tsEndPoint,
}
return v, nil
}
// For instance, the insertlogRoot is "backup/bak1/data/insert_log/435978159196147009/435978159196147010".
// 435978159196147009 is a collection id, 435978159196147010 is a partition id,
// there is a segment(id is 435978159261483009) under this partition.
// ListWithPrefix() will return all the insert logs under this partition:
//
// "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/0/435978159903735811"
// "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/1/435978159903735812"
// "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/100/435978159903735809"
// "backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/101/435978159903735810"
//
// The deltalogRoot is "backup/bak1/data/delta_log/435978159196147009/435978159196147010".
// Then we get all the delta logs under this partition:
//
// "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009/434574382554415105"
//
// In this function, we will constuct a list of SegmentFilesHolder objects, each SegmentFilesHolder holds the
// insert logs and delta logs of a segment.
func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoot string) ([]*SegmentFilesHolder, error) {
holders := make(map[int64]*SegmentFilesHolder)
// TODO add context
insertlogs, _, err := p.chunkManager.ListWithPrefix(context.TODO(), insertlogRoot, true)
if err != nil {
log.Error("Binlog parser: list insert logs error", zap.Error(err))
return nil, err
}
// collect insert log paths
log.Info("Binlog parser: list insert logs", zap.Int("logsCount", len(insertlogs)))
for _, insertlog := range insertlogs {
log.Info("Binlog parser: mapping insert log to segment", zap.String("insertlog", insertlog))
fieldPath := path.Dir(insertlog)
fieldStrID := path.Base(fieldPath)
fieldID, err := strconv.ParseInt(fieldStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: parse field id error", zap.String("fieldPath", fieldPath), zap.Error(err))
return nil, err
}
segmentPath := path.Dir(fieldPath)
segmentStrID := path.Base(segmentPath)
segmentID, err := strconv.ParseInt(segmentStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: parse segment id error", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, err
}
holder, ok := holders[segmentID]
if ok {
holder.fieldFiles[fieldID] = append(holder.fieldFiles[fieldID], insertlog)
} else {
holder = &SegmentFilesHolder{
segmentID: segmentID,
fieldFiles: make(map[int64][]string),
deltaFiles: make([]string, 0),
}
holder.fieldFiles[fieldID] = make([]string, 0)
holder.fieldFiles[fieldID] = append(holder.fieldFiles[fieldID], insertlog)
holders[segmentID] = holder
}
}
// sort the insert log paths of each field by ascendent sequence
// there might be several insert logs under a field, for example:
// 2 insert logs under field a: a_1, a_2
// 2 insert logs under field b: b_1, b_2
// the row count of a_1 is equal to b_1, the row count of a_2 is equal to b_2
// when we read these logs, we firstly read a_1 and b_1, then read a_2 and b_2
// so, here we must ensure the paths are arranged correctly
for _, holder := range holders {
for _, v := range holder.fieldFiles {
sort.Strings(v)
}
}
// collect delta log paths
if len(deltalogRoot) > 0 {
// TODO add context
deltalogs, _, err := p.chunkManager.ListWithPrefix(context.TODO(), deltalogRoot, true)
if err != nil {
log.Error("Binlog parser: list delta logs error", zap.Error(err))
return nil, err
}
log.Info("Binlog parser: list delta logs", zap.Int("logsCount", len(deltalogs)))
for _, deltalog := range deltalogs {
log.Info("Binlog parser: mapping delta log to segment", zap.String("deltalog", deltalog))
segmentPath := path.Dir(deltalog)
segmentStrID := path.Base(segmentPath)
segmentID, err := strconv.ParseInt(segmentStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: parse segment id error", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, err
}
// if the segment id doesn't exist, no need to process this deltalog
holder, ok := holders[segmentID]
if ok {
holder.deltaFiles = append(holder.deltaFiles, deltalog)
}
}
}
holdersList := make([]*SegmentFilesHolder, 0)
for _, holder := range holders {
holdersList = append(holdersList, holder)
}
return holdersList, nil
}
func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) error {
if segmentHolder == nil {
log.Error("Binlog parser: segment files holder is nil")
return errors.New("segment files holder is nil")
}
adapter, err := NewBinlogAdapter(p.collectionSchema, p.shardNum, p.segmentSize,
MaxTotalSizeInMemory, p.chunkManager, p.callFlushFunc, p.tsEndPoint)
if err != nil {
log.Error("Binlog parser: failed to create binlog adapter", zap.Error(err))
return err
}
return adapter.Read(segmentHolder)
}
// This functions requires two paths:
// 1. the insert log path of a partition
// 2. the delta log path of a partiion (optional)
func (p *BinlogParser) Parse(filePaths []string) error {
if len(filePaths) != 1 && len(filePaths) != 2 {
log.Error("Binlog parser: illegal paths for binlog import")
return errors.New("illegal paths for binlog import, partition binlog path and partition delta path are required")
}
insertlogPath := filePaths[0]
deltalogPath := ""
if len(filePaths) == 2 {
deltalogPath = filePaths[1]
}
log.Info("Binlog parser: target paths",
zap.String("insertlogPath", insertlogPath),
zap.String("deltalogPath", deltalogPath))
segmentHolders, err := p.constructSegmentHolders(insertlogPath, deltalogPath)
if err != nil {
return err
}
for _, segmentHolder := range segmentHolders {
err = p.parseSegmentFiles(segmentHolder)
if err != nil {
return err
}
// trigger gb after each segment finished
triggerGC()
}
return nil
}

View File

@ -0,0 +1,290 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"errors"
"path"
"strconv"
"testing"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
)
func Test_NewBinlogParser(t *testing.T) {
// nil schema
parser, err := NewBinlogParser(nil, 2, 1024, nil, nil, 0)
assert.Nil(t, parser)
assert.NotNil(t, err)
// nil chunkmanager
parser, err = NewBinlogParser(sampleSchema(), 2, 1024, nil, nil, 0)
assert.Nil(t, parser)
assert.NotNil(t, err)
// nil flushfunc
parser, err = NewBinlogParser(sampleSchema(), 2, 1024, &MockChunkManager{}, nil, 0)
assert.Nil(t, parser)
assert.NotNil(t, err)
// succeed
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
parser, err = NewBinlogParser(sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 0)
assert.NotNil(t, parser)
assert.Nil(t, err)
}
func Test_BinlogParserConstructHolders(t *testing.T) {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
chunkManager := &MockChunkManager{
listResult: make(map[string][]string),
}
insertPath := "insertPath"
deltaPath := "deltaPath"
// the first segment has 12 fields, each field has 2 binlog files
seg1Files := []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/0/435978159903735800",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/1/435978159903735801",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/102/435978159903735802",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/103/435978159903735803",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/104/435978159903735804",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/105/435978159903735805",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/106/435978159903735806",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/107/435978159903735807",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/108/435978159903735808",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/109/435978159903735809",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/110/435978159903735810",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/111/435978159903735811",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/0/425978159903735800",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/1/425978159903735801",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/102/425978159903735802",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/103/425978159903735803",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/104/425978159903735804",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/105/425978159903735805",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/106/425978159903735806",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/107/425978159903735807",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/108/425978159903735808",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/109/425978159903735809",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/110/425978159903735810",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/111/425978159903735811",
}
// the second segment has 12 fields, each field has 1 binlog file
seg2Files := []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/0/435978159903735811",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/1/435978159903735812",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/102/435978159903735802",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/103/435978159903735803",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/104/435978159903735804",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/105/435978159903735805",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/106/435978159903735806",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/107/435978159903735807",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/108/435978159903735808",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/109/435978159903735809",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/110/435978159903735810",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009/111/435978159903735811",
}
chunkManager.listResult[insertPath] = append(chunkManager.listResult[insertPath], seg1Files...)
chunkManager.listResult[insertPath] = append(chunkManager.listResult[insertPath], seg2Files...)
// the segment has a delta log file
chunkManager.listResult[deltaPath] = []string{
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009/434574382554415105",
}
parser, err := NewBinlogParser(sampleSchema(), 2, 1024, chunkManager, flushFunc, 0)
assert.NotNil(t, parser)
assert.Nil(t, err)
holders, err := parser.constructSegmentHolders(insertPath, deltaPath)
assert.Nil(t, err)
assert.Equal(t, 2, len(holders))
// verify the first segment
holder := holders[0]
assert.Equal(t, int64(435978159261483008), holder.segmentID)
assert.Equal(t, 12, len(holder.fieldFiles))
for i := 0; i < 12; i++ {
fieldPath := path.Dir(seg1Files[i])
fieldStrID := path.Base(fieldPath)
fieldID, _ := strconv.ParseInt(fieldStrID, 10, 64)
logFiles, ok := holder.fieldFiles[fieldID]
assert.True(t, ok)
assert.Equal(t, 2, len(logFiles))
// verify logs under each field is sorted
log1 := logFiles[0]
logID1 := path.Base(log1)
ID1, _ := strconv.ParseInt(logID1, 10, 64)
log2 := logFiles[1]
logID2 := path.Base(log2)
ID2, _ := strconv.ParseInt(logID2, 10, 64)
assert.LessOrEqual(t, ID1, ID2)
}
assert.Equal(t, 0, len(holder.deltaFiles))
// verify the second segment
holder = holders[1]
assert.Equal(t, int64(435978159261483009), holder.segmentID)
assert.Equal(t, len(seg2Files), len(holder.fieldFiles))
for i := 0; i < len(seg2Files); i++ {
fieldPath := path.Dir(seg2Files[i])
fieldStrID := path.Base(fieldPath)
fieldID, _ := strconv.ParseInt(fieldStrID, 10, 64)
logFiles, ok := holder.fieldFiles[fieldID]
assert.True(t, ok)
assert.Equal(t, 1, len(logFiles))
assert.Equal(t, seg2Files[i], logFiles[0])
}
assert.Equal(t, 1, len(holder.deltaFiles))
assert.Equal(t, chunkManager.listResult[deltaPath][0], holder.deltaFiles[0])
}
func Test_BinlogParserConstructHoldersFailed(t *testing.T) {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
chunkManager := &MockChunkManager{
listErr: errors.New("error"),
listResult: make(map[string][]string),
}
parser, err := NewBinlogParser(sampleSchema(), 2, 1024, chunkManager, flushFunc, 0)
assert.NotNil(t, parser)
assert.Nil(t, err)
insertPath := "insertPath"
deltaPath := "deltaPath"
// chunkManager return error
holders, err := parser.constructSegmentHolders(insertPath, deltaPath)
assert.NotNil(t, err)
assert.Nil(t, holders)
// parse field id error(insert log)
chunkManager.listErr = nil
chunkManager.listResult[insertPath] = []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/illegal/435978159903735811",
}
holders, err = parser.constructSegmentHolders(insertPath, deltaPath)
assert.NotNil(t, err)
assert.Nil(t, holders)
// parse segment id error(insert log)
chunkManager.listResult[insertPath] = []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/illegal/0/435978159903735811",
}
holders, err = parser.constructSegmentHolders(insertPath, deltaPath)
assert.NotNil(t, err)
assert.Nil(t, holders)
// parse segment id error(delta log)
chunkManager.listResult[insertPath] = []string{}
chunkManager.listResult[deltaPath] = []string{
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/illegal/434574382554415105",
}
holders, err = parser.constructSegmentHolders(insertPath, deltaPath)
assert.NotNil(t, err)
assert.Nil(t, holders)
}
func Test_BinlogParserParseFilesFailed(t *testing.T) {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
parser, err := NewBinlogParser(sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 0)
assert.NotNil(t, parser)
assert.Nil(t, err)
err = parser.parseSegmentFiles(nil)
assert.NotNil(t, err)
parser.collectionSchema = nil
err = parser.parseSegmentFiles(&SegmentFilesHolder{})
assert.NotNil(t, err)
}
func Test_BinlogParserParse(t *testing.T) {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
chunkManager := &MockChunkManager{}
schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
Name: "id",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
}
parser, err := NewBinlogParser(schema, 2, 1024, chunkManager, flushFunc, 0)
assert.NotNil(t, parser)
assert.Nil(t, err)
// zero paths
err = parser.Parse(nil)
assert.NotNil(t, err)
// one empty path
paths := []string{
"insertPath",
}
err = parser.Parse(paths)
assert.Nil(t, err)
// two empty paths
paths = append(paths, "deltaPath")
err = parser.Parse(paths)
assert.Nil(t, err)
// wrong path
chunkManager.listResult = make(map[string][]string)
chunkManager.listResult["insertPath"] = []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/illegal/101/435978159903735811",
}
err = parser.Parse(paths)
assert.NotNil(t, err)
// file not found
chunkManager.listResult["insertPath"] = []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/0/435978159903735811",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/1/435978159903735811",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/101/435978159903735811",
}
err = parser.Parse(paths)
assert.NotNil(t, err)
}

View File

@ -1,9 +1,26 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"bufio"
"context"
"errors"
"math"
"path"
"runtime/debug"
"strconv"
@ -27,7 +44,21 @@ import (
const (
JSONFileExt = ".json"
NumpyFileExt = ".npy"
MaxFileSize = 1 * 1024 * 1024 * 1024 // maximum size of each file
// this limitation is to avoid this OOM risk:
// for column-based file, we read all its data into memory, if user input a large file, the read() method may
// cost extra memory and lear to OOM.
MaxFileSize = 1 * 1024 * 1024 * 1024 // 1GB
// this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
// flush the segment when its data reach this limitation, let the compaction to compact it later.
MaxSegmentSizeInMemory = 512 * 1024 * 1024 // 512MB
// this limitation is to avoid this OOM risk:
// if the shard number is a large number, although single segment size is small, but there are lot of in-memory segments,
// the total memory size might cause OOM.
MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB
)
type ImportWrapper struct {
@ -111,10 +142,17 @@ func getFileNameAndExt(filePath string) (string, string) {
return fileNameWithoutExt, fileType
}
// trigger golang gc to return all free memory back to the underlying system at once,
// Note: this operation is expensive, and can lead to latency spikes as it holds the heap lock through the whole process
func triggerGC() {
debug.FreeOSMemory()
}
func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error {
// use this map to check duplicate file name(only for numpy file)
fileNames := make(map[string]struct{})
totalSize := int64(0)
for i := 0; i < len(filePaths); i++ {
filePath := filePaths[i]
name, fileType := getFileNameAndExt(filePath)
@ -122,6 +160,7 @@ func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error
if ok {
// only check dupliate numpy file
if fileType == NumpyFileExt {
log.Error("import wrapper: duplicate file name", zap.String("fileName", name+"."+fileType))
return errors.New("duplicate file: " + name + "." + fileType)
}
} else {
@ -129,25 +168,44 @@ func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error
}
// check file type
// row-based only support json type, column-based can support json and numpy type
if rowBased {
if fileType != JSONFileExt {
log.Error("import wrapper: unsupported file type for row-based mode", zap.String("filePath", filePath))
return errors.New("unsupported file type for row-based mode: " + filePath)
}
} else {
if fileType != JSONFileExt && fileType != NumpyFileExt {
log.Error("import wrapper: unsupported file type for column-based mode", zap.String("filePath", filePath))
return errors.New("unsupported file type for column-based mode: " + filePath)
}
}
// check file size
// check file size, single file size cannot exceed MaxFileSize
// TODO add context
size, _ := p.chunkManager.Size(context.TODO(), filePath)
size, err := p.chunkManager.Size(context.TODO(), filePath)
if err != nil {
log.Error("import wrapper: failed to get file size", zap.String("filePath", filePath), zap.Any("err", err))
return errors.New("failed to ")
}
if size == 0 {
log.Error("import wrapper: file path is empty", zap.String("filePath", filePath))
return errors.New("the file " + filePath + " is empty")
}
if size > MaxFileSize {
return errors.New("the file " + filePath + " size exceeds the maximum file size: " + strconv.FormatInt(MaxFileSize, 10) + " bytes")
log.Error("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
zap.Int64("fileSize", size), zap.Int64("MaxFileSize", MaxFileSize))
return errors.New("the file " + filePath + " size exceeds the maximum size: " + strconv.FormatInt(MaxFileSize, 10) + " bytes")
}
totalSize += size
}
// especially for column-base, total size of files cannot exceed MaxTotalSizeInMemory
if totalSize > MaxTotalSizeInMemory {
log.Error("import wrapper: total size of files exceeds the maximum size", zap.Int64("totalSize", totalSize), zap.Int64("MaxTotalSize", MaxTotalSizeInMemory))
return errors.New("the total size of all files exceeds the maximum size: " + strconv.FormatInt(MaxTotalSizeInMemory, 10) + " bytes")
}
return nil
@ -157,9 +215,17 @@ func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error
// filePath and rowBased are from ImportTask
// if onlyValidate is true, this process only do validation, no data generated, callFlushFunc will not be called
func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate bool) error {
log.Info("import wrapper: filePaths", zap.Any("filePaths", filePaths))
// data restore function to import milvus native binlog files(for backup/restore tools)
// the backup/restore tool provide two paths for a partition, the first path is binlog path, the second is deltalog path
if p.isBinlogImport(filePaths) {
// TODO: handle the timestamp end point passed from client side, currently use math.MaxUint64
return p.doBinlogImport(filePaths, math.MaxUint64)
}
// normal logic for import general data files
err := p.fileValidation(filePaths, rowBased)
if err != nil {
log.Error("import error: " + err.Error())
return err
}
@ -178,8 +244,10 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
}
// no need to check else, since the fileValidation() already do this
} // no need to check else, since the fileValidation() already do this
// trigger gc after each file finished
triggerGC()
}
} else {
// parse and consume column-based files
@ -187,6 +255,11 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
// after all columns are parsed/consumed, we need to combine map[string]storage.FieldData into one
// and use splitFieldsData() to split fields data into segments according to shard number
fieldsData := initSegmentData(p.collectionSchema)
if fieldsData == nil {
log.Error("import wrapper: failed to initialize FieldData list")
return errors.New("failed to initialize FieldData list")
}
rowCount := 0
// function to combine column data into fieldsData
@ -247,15 +320,23 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
// no need to check else, since the fileValidation() already do this
}
// trigger after read finished
triggerGC()
// split fields data into segments
err := p.splitFieldsData(fieldsData, filePaths)
if err != nil {
log.Error("import error: " + err.Error())
return err
}
// trigger after write finished
triggerGC()
}
debug.FreeOSMemory()
return p.reportPersisted()
}
func (p *ImportWrapper) reportPersisted() error {
// report file process state
p.importResult.State = commonpb.ImportState_ImportPersisted
// persist state task is valuable, retry more times in case fail this task only because of network error
@ -263,12 +344,55 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
return p.reportFunc(p.importResult)
}, retry.Attempts(10))
if reportErr != nil {
log.Warn("fail to report import state to root coord", zap.Error(err))
log.Warn("import wrapper: fail to report import state to root coord", zap.Error(reportErr))
return reportErr
}
return nil
}
// For internal usage by the restore tool: https://github.com/zilliztech/milvus-backup
// This tool exports data from a milvus service, and call bulkload interface to import native data into another milvus service.
// This tool provides two paths: one is data log path of a partition,the other is delta log path of this partition.
// This method checks the filePaths, if the file paths is exist and not a file, we say it is native import.
func (p *ImportWrapper) isBinlogImport(filePaths []string) bool {
// must contains the insert log path, and the delta log path is optional
if len(filePaths) != 1 && len(filePaths) != 2 {
log.Info("import wrapper: paths count is not 1 or 2", zap.Int("len", len(filePaths)))
return false
}
for i := 0; i < len(filePaths); i++ {
filePath := filePaths[i]
_, fileType := getFileNameAndExt(filePath)
// contains file extension, is not a path
if len(fileType) != 0 {
log.Info("import wrapper: not a path", zap.String("filePath", filePath), zap.String("fileType", fileType))
return false
}
}
log.Info("import wrapper: do binlog import")
return true
}
func (p *ImportWrapper) doBinlogImport(filePaths []string, tsEndPoint uint64) error {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
p.printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
return p.callFlushFunc(fields, shardID)
}
parser, err := NewBinlogParser(p.collectionSchema, p.shardNum, p.segmentSize, p.chunkManager, flushFunc, tsEndPoint)
if err != nil {
return err
}
err = parser.Parse(filePaths)
if err != nil {
return err
}
return p.reportPersisted()
}
func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) error {
tr := timerecord.NewTimeRecorder("json row-based parser: " + filePath)
@ -483,6 +607,7 @@ func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storag
func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.FieldData, files []string) error {
if len(fieldsData) == 0 {
log.Error("import wrapper: fields data is empty")
return errors.New("import error: fields data is empty")
}
@ -504,6 +629,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
if !schema.GetAutoID() {
v, ok := fieldsData[schema.GetFieldID()]
if !ok {
log.Error("import wrapper: field not provided", zap.String("fieldName", schema.GetName()))
return errors.New("import error: field " + schema.GetName() + " not provided")
}
rowCounter[schema.GetName()] = v.RowNum()
@ -513,17 +639,21 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
}
}
if primaryKey == nil {
log.Error("import wrapper: primary key field is not found")
return errors.New("import error: primary key field is not found")
}
for name, count := range rowCounter {
if count != rowCount {
log.Error("import wrapper: field row count is not equal to other fields row count", zap.String("fieldName", name),
zap.Int("rowCount", count), zap.Int("otherRowCount", rowCount))
return errors.New("import error: field " + name + " row count " + strconv.Itoa(count) + " is not equal to other fields row count " + strconv.Itoa(rowCount))
}
}
primaryData, ok := fieldsData[primaryKey.GetFieldID()]
if !ok {
log.Error("import wrapper: primary key field is not provided", zap.String("keyName", primaryKey.GetName()))
return errors.New("import error: primary key field is not provided")
}
@ -550,7 +680,8 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
}
if primaryData.RowNum() <= 0 {
return errors.New("import error: primary key " + primaryKey.GetName() + " not provided")
log.Error("import wrapper: primary key not provided", zap.String("keyName", primaryKey.GetName()))
return errors.New("import wrapper: primary key " + primaryKey.GetName() + " not provided")
}
// prepare segemnts
@ -558,7 +689,8 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
for i := 0; i < int(p.shardNum); i++ {
segmentData := initSegmentData(p.collectionSchema)
if segmentData == nil {
return nil
log.Error("import wrapper: failed to initialize FieldData list")
return errors.New("failed to initialize FieldData list")
}
segmentsData = append(segmentsData, segmentData)
}
@ -569,7 +701,8 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
schema := p.collectionSchema.Fields[i]
appendFuncErr := p.appendFunc(schema)
if appendFuncErr == nil {
return errors.New("import error: unsupported field data type")
log.Error("import wrapper: unsupported field data type")
return errors.New("import wrapper: unsupported field data type")
}
appendFunctions[schema.GetName()] = appendFuncErr
}
@ -586,6 +719,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
} else {
intPK, ok := interface{}(pk).(int64)
if !ok {
log.Error("import wrapper: primary key field must be int64 or varchar")
return errors.New("import error: primary key field must be int64 or varchar")
}
hash, _ := typeutil.Hash32Int64(intPK)
@ -615,6 +749,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
p.printFieldsDataInfo(segmentData, "import wrapper: prepare to flush segment", files)
err := p.callFlushFunc(segmentData, i)
if err != nil {
log.Error("import wrapper: flush callback function failed", zap.Any("err", err))
return err
}
}

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
@ -29,7 +45,12 @@ const (
)
type MockChunkManager struct {
size int64
size int64
sizeErr error
readBuf map[string][]byte
readErr error
listResult map[string][]string
listErr error
}
func (mc *MockChunkManager) RootPath() string {
@ -57,7 +78,16 @@ func (mc *MockChunkManager) Exist(ctx context.Context, filePath string) (bool, e
}
func (mc *MockChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
return nil, nil
if mc.readErr != nil {
return nil, mc.readErr
}
val, ok := mc.readBuf[filePath]
if !ok {
return nil, errors.New("mock chunk manager: file path not found: " + filePath)
}
return val, nil
}
func (mc *MockChunkManager) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) {
@ -65,6 +95,15 @@ func (mc *MockChunkManager) MultiRead(ctx context.Context, filePaths []string) (
}
func (mc *MockChunkManager) ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error) {
if mc.listErr != nil {
return nil, nil, mc.listErr
}
result, ok := mc.listResult[prefix]
if ok {
return result, nil, nil
}
return nil, nil, nil
}
@ -81,6 +120,10 @@ func (mc *MockChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.Re
}
func (mc *MockChunkManager) Size(ctx context.Context, filePath string) (int64, error) {
if mc.sizeErr != nil {
return 0, mc.sizeErr
}
return mc.size, nil
}
@ -101,7 +144,6 @@ func Test_NewImportWrapper(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, nil, 2, 1, nil, cm, nil, nil, nil)
assert.Nil(t, wrapper)
@ -127,7 +169,7 @@ func Test_NewImportWrapper(t *testing.T) {
assert.Nil(t, err)
}
func Test_ImportRowBased(t *testing.T) {
func Test_ImportWrapperRowBased(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -214,7 +256,7 @@ func Test_ImportRowBased(t *testing.T) {
assert.NotNil(t, err)
}
func Test_ImportColumnBased_json(t *testing.T) {
func Test_ImportWrapperColumnBased_json(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -314,7 +356,7 @@ func Test_ImportColumnBased_json(t *testing.T) {
assert.NotNil(t, err)
}
func Test_ImportColumnBased_StringKey(t *testing.T) {
func Test_ImportWrapperColumnBased_StringKey(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -381,7 +423,7 @@ func Test_ImportColumnBased_StringKey(t *testing.T) {
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
}
func Test_ImportColumnBased_numpy(t *testing.T) {
func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -519,7 +561,7 @@ func perfSchema(dim int) *schemapb.CollectionSchema {
return schema
}
func Test_ImportRowBased_perf(t *testing.T) {
func Test_ImportWrapperRowBased_perf(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -620,7 +662,7 @@ func Test_ImportRowBased_perf(t *testing.T) {
tr.Record("parse large json file " + filePath)
}
func Test_ImportColumnBased_perf(t *testing.T) {
func Test_ImportWrapperColumnBased_perf(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -733,7 +775,7 @@ func Test_ImportColumnBased_perf(t *testing.T) {
tr.Record("parse large json files: " + filePath1 + "," + filePath2)
}
func Test_FileValidation(t *testing.T) {
func Test_ImportWrapperFileValidation(t *testing.T) {
ctx := context.Background()
cm := &MockChunkManager{
@ -748,64 +790,71 @@ func Test_FileValidation(t *testing.T) {
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil)
// duplicate files
var files = [2]string{"1.npy", "1.npy"}
err := wrapper.fileValidation(files[:], false)
files := []string{"1.npy", "1.npy"}
err := wrapper.fileValidation(files, false)
assert.NotNil(t, err)
err = wrapper.fileValidation(files[:], true)
err = wrapper.fileValidation(files, true)
assert.NotNil(t, err)
// unsupported file name
files[0] = "a/1.npy"
files[1] = "b/1.npy"
err = wrapper.fileValidation(files[:], true)
err = wrapper.fileValidation(files, true)
assert.NotNil(t, err)
err = wrapper.fileValidation(files[:], false)
err = wrapper.fileValidation(files, false)
assert.NotNil(t, err)
// unsupported file type
files[0] = "1"
files[1] = "1"
err = wrapper.fileValidation(files[:], true)
err = wrapper.fileValidation(files, true)
assert.NotNil(t, err)
err = wrapper.fileValidation(files[:], false)
err = wrapper.fileValidation(files, false)
assert.NotNil(t, err)
// valid cases
files[0] = "1.json"
files[1] = "2.json"
err = wrapper.fileValidation(files[:], true)
err = wrapper.fileValidation(files, true)
assert.Nil(t, err)
files[1] = "2.npy"
err = wrapper.fileValidation(files[:], false)
err = wrapper.fileValidation(files, false)
assert.Nil(t, err)
// empty file
cm = &MockChunkManager{
size: 0,
}
cm.size = 0
wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil)
err = wrapper.fileValidation(files[:], true)
err = wrapper.fileValidation(files, true)
assert.NotNil(t, err)
err = wrapper.fileValidation(files[:], false)
err = wrapper.fileValidation(files, false)
assert.NotNil(t, err)
// file size exceed limit
cm = &MockChunkManager{
size: MaxFileSize + 1,
}
// file size exceed MaxFileSize limit
cm.size = MaxFileSize + 1
wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil)
err = wrapper.fileValidation(files[:], true)
err = wrapper.fileValidation(files, true)
assert.NotNil(t, err)
err = wrapper.fileValidation(files[:], false)
err = wrapper.fileValidation(files, false)
assert.NotNil(t, err)
// total files size exceed MaxTotalSizeInMemory limit
cm.size = MaxFileSize - 1
files = append(files, "3.npy")
err = wrapper.fileValidation(files, false)
assert.NotNil(t, err)
// failed to get file size
cm.sizeErr = errors.New("error")
err = wrapper.fileValidation(files, false)
assert.NotNil(t, err)
}
func Test_ReportImportFailRowBased(t *testing.T) {
func Test_ImportWrapperReportFailRowBased(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -871,7 +920,7 @@ func Test_ReportImportFailRowBased(t *testing.T) {
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
}
func Test_ReportImportFailColumnBased_json(t *testing.T) {
func Test_ImportWrapperReportFailColumnBased_json(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -952,7 +1001,7 @@ func Test_ReportImportFailColumnBased_json(t *testing.T) {
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
}
func Test_ReportImportFailColumnBased_numpy(t *testing.T) {
func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
f := dependency.NewDefaultFactory(true)
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
@ -1039,3 +1088,102 @@ func Test_ReportImportFailColumnBased_numpy(t *testing.T) {
assert.Equal(t, 5, rowCount)
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
}
func Test_ImportWrapperIsBinlogImport(t *testing.T) {
ctx := context.Background()
cm := &MockChunkManager{
size: 1,
}
idAllocator := newIDAllocator(ctx, t)
schema := perfSchema(128)
shardNum := 2
segmentSize := 512 // unit: MB
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil)
paths := []string{}
b := wrapper.isBinlogImport(paths)
assert.False(t, b)
paths = []string{
"path1",
"path2",
"path3",
}
b = wrapper.isBinlogImport(paths)
assert.False(t, b)
paths = []string{
"path1.txt",
"path2.jpg",
}
b = wrapper.isBinlogImport(paths)
assert.False(t, b)
paths = []string{
"/tmp",
"/tmp",
}
b = wrapper.isBinlogImport(paths)
assert.True(t, b)
}
func Test_ImportWrapperDoBinlogImport(t *testing.T) {
ctx := context.Background()
cm := &MockChunkManager{
size: 1,
}
idAllocator := newIDAllocator(ctx, t)
schema := perfSchema(128)
shardNum := 2
segmentSize := 512 // unit: MB
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil)
paths := []string{
"/tmp",
"/tmp",
}
wrapper.chunkManager = nil
// failed to create new BinlogParser
err := wrapper.doBinlogImport(paths, 0)
assert.NotNil(t, err)
cm.listErr = errors.New("error")
wrapper.chunkManager = cm
wrapper.callFlushFunc = func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
// failed to call parser.Parse()
err = wrapper.doBinlogImport(paths, 0)
assert.NotNil(t, err)
// Import() failed
err = wrapper.Import(paths, false, false)
assert.NotNil(t, err)
cm.listErr = nil
wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper.importResult = &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
// succeed
err = wrapper.doBinlogImport(paths, 0)
assert.Nil(t, err)
}

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
@ -575,6 +591,10 @@ func (v *JSONRowConsumer) flush(force bool) error {
log.Info("JSON row consumer: flush fulled segment", zap.Int("bytes", memSize), zap.Int("rowNum", rowNum))
v.callFlushFunc(segmentData, i)
v.segmentsData[i] = initSegmentData(v.collectionSchema)
if v.segmentsData[i] == nil {
log.Error("JSON row consumer: fail to initialize in-memory segment data")
return errors.New("fail to initialize in-memory segment data")
}
}
}

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
@ -72,7 +88,10 @@ func sampleSchema() *schemapb.CollectionSchema {
Name: "field_string",
IsPrimaryKey: false,
Description: "string",
DataType: schemapb.DataType_String,
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{Key: "max_length", Value: "128"},
},
},
{
FieldID: 110,
@ -168,7 +187,9 @@ func Test_AdjustBufSize(t *testing.T) {
parser := NewJSONParser(ctx, schema)
assert.NotNil(t, parser)
sizePerRecord, _ := typeutil.EstimateSizePerRecord(schema)
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
assert.Nil(t, err)
assert.Greater(t, sizePerRecord, 0)
assert.Equal(t, MaxBatchCount, MaxFileSize/(sizePerRecord*int(parser.bufSize)))
// huge row
@ -194,7 +215,7 @@ func Test_AdjustBufSize(t *testing.T) {
assert.Equal(t, int64(MinBufferSize), parser.bufSize)
}
func Test_ParserRows(t *testing.T) {
func Test_JSONParserParserRows(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -299,7 +320,7 @@ func Test_ParserRows(t *testing.T) {
assert.NotNil(t, err)
}
func Test_ParserColumns(t *testing.T) {
func Test_JSONParserParserColumns(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -423,7 +444,7 @@ func Test_ParserColumns(t *testing.T) {
assert.NotNil(t, err)
}
func Test_ParserRowsStringKey(t *testing.T) {
func Test_JSONParserParserRowsStringKey(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -525,7 +546,7 @@ func Test_ParserRowsStringKey(t *testing.T) {
assert.Equal(t, int64(10), validator.ValidateCount())
}
func Test_ParserColumnsStrKey(t *testing.T) {
func Test_JSONParserParserColumnsStrKey(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
@ -29,7 +45,21 @@ func Test_CreateNumpyFile(t *testing.T) {
assert.NotNil(t, err)
}
func Test_SetByteOrder(t *testing.T) {
func Test_CreateNumpyData(t *testing.T) {
// directory doesn't exist
data1 := []float32{1, 2, 3, 4, 5}
buf, err := CreateNumpyData(data1)
assert.NotNil(t, buf)
assert.Nil(t, err)
// invalid data type
data2 := make(map[string]int)
buf, err = CreateNumpyData(data2)
assert.NotNil(t, err)
assert.Nil(t, buf)
}
func Test_NumpyAdapterSetByteOrder(t *testing.T) {
adapter := &NumpyAdapter{
reader: nil,
npyReader: &npy.Reader{},
@ -46,7 +76,7 @@ func Test_SetByteOrder(t *testing.T) {
assert.Equal(t, binary.BigEndian, adapter.order)
}
func Test_ReadError(t *testing.T) {
func Test_NumpyAdapterReadError(t *testing.T) {
adapter := &NumpyAdapter{
reader: nil,
npyReader: nil,
@ -174,7 +204,7 @@ func Test_ReadError(t *testing.T) {
}
}
func Test_Read(t *testing.T) {
func Test_NumpyAdapterRead(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
@ -42,7 +58,7 @@ func Test_ConvertNumpyType(t *testing.T) {
assert.Equal(t, schemapb.DataType_None, dt)
}
func Test_Validate(t *testing.T) {
func Test_NumpyParserValidate(t *testing.T) {
ctx := context.Background()
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
@ -296,7 +312,7 @@ func Test_Validate(t *testing.T) {
}()
}
func Test_Parse(t *testing.T) {
func Test_NumpyParserParse(t *testing.T) {
ctx := context.Background()
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
@ -488,7 +504,7 @@ func Test_Parse(t *testing.T) {
checkFunc(data10, "field_float_vector", flushFunc)
}
func Test_Parse_perf(t *testing.T) {
func Test_NumpyParserParse_perf(t *testing.T) {
ctx := context.Background()
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)