milvus/internal/storage/utils.go

237 lines
5.9 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package storage
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"sort"
"strconv"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/kv"
)
// GetBinlogSize get size of a binlog file.
// normal binlog file, error = nil;
// key not exist, size = 0, error = nil;
// key not in binlog format, size = (a not accurate number), error != nil;
// failed to read event reader, size = (a not accurate number), error != nil;
func GetBinlogSize(kv kv.DataKV, key string) (int64, error) {
return kv.GetSize(key)
}
// EstimateMemorySize get approximate memory size of a binlog file.
// 1, key not exist, size = 0, error != nil;
// 2, failed to read event header, size = 0, error != nil;
// 3, invalid event length, size = 0, error != nil;
// 4, failed to read descriptor event, size = 0, error != nil;
// 5, original_size not in extra, size = 0, error != nil;
// 6, original_size not in int format, size = 0, error != nil;
// 7, normal binlog with original_size, return original_size, error = nil;
func EstimateMemorySize(kv kv.DataKV, key string) (int64, error) {
total := int64(0)
header := &eventHeader{}
headerSize := binary.Size(header)
startPos := binary.Size(MagicNumber)
endPos := startPos + headerSize
// get header
headerContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos))
if err != nil {
return total, err
}
buffer := bytes.NewBuffer(headerContent)
header, err = readEventHeader(buffer)
if err != nil {
return total, err
}
if header.EventLength <= 0 {
return total, fmt.Errorf("key %v not in binlog format", key)
}
desc := &descriptorEvent{}
endPos = startPos + int(header.EventLength)
descContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos))
if err != nil {
return total, err
}
buffer = bytes.NewBuffer(descContent)
desc, err = ReadDescriptorEvent(buffer)
if err != nil {
return total, err
}
sizeStr, ok := desc.Extras[originalSizeKey]
if !ok {
return total, fmt.Errorf("key %v not in extra information", originalSizeKey)
}
size, err := strconv.Atoi(fmt.Sprintf("%v", sizeStr))
if err != nil {
return total, fmt.Errorf("%v not in valid format, value: %v", originalSizeKey, sizeStr)
}
total = int64(size)
return total, nil
}
//////////////////////////////////////////////////////////////////////////////////////////////////
func checkTsField(data *InsertData) bool {
tsData, ok := data.Data[common.TimeStampField]
if !ok {
return false
}
_, ok = tsData.(*Int64FieldData)
return ok
}
func checkRowIDField(data *InsertData) bool {
rowIDData, ok := data.Data[common.RowIDField]
if !ok {
return false
}
_, ok = rowIDData.(*Int64FieldData)
return ok
}
func checkNumRows(fieldDatas ...FieldData) bool {
if len(fieldDatas) <= 0 {
return true
}
numRows := fieldDatas[0].RowNum()
for i := 1; i < len(fieldDatas); i++ {
if numRows != fieldDatas[i].RowNum() {
return false
}
}
return true
}
type fieldDataList struct {
IDs []FieldID
datas []FieldData
}
func (ls fieldDataList) Len() int {
return len(ls.IDs)
}
func (ls fieldDataList) Less(i, j int) bool {
return ls.IDs[i] < ls.IDs[j]
}
func (ls fieldDataList) Swap(i, j int) {
ls.IDs[i], ls.IDs[j] = ls.IDs[j], ls.IDs[i]
ls.datas[i], ls.datas[j] = ls.datas[j], ls.datas[i]
}
func sortFieldDataList(ls fieldDataList) {
sort.Sort(ls)
}
// TransferColumnBasedInsertDataToRowBased transfer column-based insert data to row-based rows.
// Note:
// - ts column must exist in insert data;
// - row id column must exist in insert data;
// - the row num of all column must be equal;
// - num_rows = len(RowData), a row will be assembled into the value of blob with field id order;
func TransferColumnBasedInsertDataToRowBased(data *InsertData) (
Timestamps []uint64,
RowIDs []int64,
RowData []*commonpb.Blob,
err error,
) {
if !checkTsField(data) {
return nil, nil, nil,
errors.New("cannot get timestamps from insert data")
}
if !checkRowIDField(data) {
return nil, nil, nil,
errors.New("cannot get row ids from insert data")
}
tss := data.Data[common.TimeStampField].(*Int64FieldData)
rowIds := data.Data[common.RowIDField].(*Int64FieldData)
ls := fieldDataList{
IDs: make([]FieldID, 0),
datas: make([]FieldData, 0),
}
for fieldID := range data.Data {
if fieldID == common.TimeStampField || fieldID == common.RowIDField {
continue
}
ls.IDs = append(ls.IDs, fieldID)
ls.datas = append(ls.datas, data.Data[fieldID])
}
// checkNumRows(tss, rowIds, ls.datas...) // don't work
all := []FieldData{tss, rowIds}
all = append(all, ls.datas...)
if !checkNumRows(all...) {
return nil, nil, nil,
errors.New("columns of insert data have different length")
}
sortFieldDataList(ls)
numRows := tss.RowNum()
rows := make([]*commonpb.Blob, numRows)
for i := 0; i < numRows; i++ {
blob := &commonpb.Blob{
Value: make([]byte, 0),
}
var buffer bytes.Buffer
for j := 0; j < ls.Len(); j++ {
d := ls.datas[j].GetRow(i)
err := binary.Write(&buffer, common.Endian, d)
if err != nil {
return nil, nil, nil,
fmt.Errorf("failed to get binary row, err: %v", err)
}
}
blob.Value = buffer.Bytes()
rows[i] = blob
}
utss := make([]uint64, tss.RowNum())
for i := 0; i < tss.RowNum(); i++ {
utss[i] = uint64(tss.Data[i])
}
return utss, rowIds.Data, rows, nil
}