enhance: add chunk basic impl (#34634)

https://github.com/milvus-io/milvus/issues/35112
This pr would not affect milvus functionality by now.
It implments a Chunk memory layout that looks like 
```
VariableColumn
|offset|offset|offset|
|data|data|data|

```
We maybe move offsets to the beginning and add null bitmaps later but
not in this PR.
And mmap test will also be added in another PR.

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2024-08-01 10:29:51 +08:00 committed by GitHub
parent e9d61daa3f
commit f229f244d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1250 additions and 0 deletions

View File

@ -24,6 +24,9 @@ set(COMMON_SRC
EasyAssert.cpp
FieldData.cpp
RegexQuery.cpp
ChunkTarget.cpp
Chunk.cpp
ChunkWriter.cpp
)
add_library(milvus_common SHARED ${COMMON_SRC})

View File

@ -0,0 +1,61 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <sys/mman.h>
#include <cstdint>
#include "common/Array.h"
#include "common/Span.h"
#include "common/Types.h"
#include "common/Chunk.h"
namespace milvus {
std::vector<std::string_view>
StringChunk::StringViews() const {
std::vector<std::string_view> ret;
for (int i = 0; i < row_nums_ - 1; i++) {
ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]);
}
ret.emplace_back(data_ + offsets_[row_nums_ - 1],
size_ - MMAP_STRING_PADDING - offsets_[row_nums_ - 1]);
return ret;
}
void
ArrayChunk::ConstructViews() {
views_.reserve(row_nums_);
for (int i = 0; i < row_nums_; ++i) {
auto data_ptr = data_ + offsets_[i];
auto next_data_ptr = i == row_nums_ - 1
? data_ + size_ - MMAP_ARRAY_PADDING
: data_ + offsets_[i + 1];
auto offsets_len = lens_[i] * sizeof(uint64_t);
std::vector<uint64_t> element_indices = {};
if (IsStringDataType(element_type_)) {
std::vector<uint64_t> tmp(
reinterpret_cast<uint64_t*>(data_ptr),
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
element_indices = std::move(tmp);
}
views_.emplace_back(data_ptr + offsets_len,
next_data_ptr - data_ptr - offsets_len,
element_type_,
std::move(element_indices));
}
}
SpanBase
ArrayChunk::Span() const {
return SpanBase(views_.data(), views_.size(), sizeof(ArrayView));
}
} // namespace milvus

View File

@ -0,0 +1,148 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string_view>
#include <utility>
#include <vector>
#include "arrow/array/array_base.h"
#include "arrow/record_batch.h"
#include "common/Array.h"
#include "common/ChunkTarget.h"
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Span.h"
#include "knowhere/sparse_utils.h"
#include "simdjson/common_defs.h"
#include "sys/mman.h"
namespace milvus {
constexpr size_t MMAP_STRING_PADDING = 1;
constexpr size_t MMAP_ARRAY_PADDING = 1;
class Chunk {
public:
Chunk() = default;
Chunk(int64_t row_nums, char* data, size_t size)
: row_nums_(row_nums), data_(data), size_(size) {
}
virtual ~Chunk() {
munmap(data_, size_);
}
protected:
char* data_;
int64_t row_nums_;
size_t size_;
};
// for fixed size data, includes fixed size array
template <typename T>
class FixedWidthChunk : public Chunk {
public:
FixedWidthChunk(int32_t row_nums, int32_t dim, char* data, size_t size)
: Chunk(row_nums, data, size), dim_(dim){};
milvus::SpanBase
Span() const {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return milvus::SpanBase(
data_ + null_bitmap_bytes_num, row_nums_, sizeof(T) * dim_);
}
private:
int dim_;
};
class StringChunk : public Chunk {
public:
StringChunk() = default;
StringChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
}
std::vector<std::string_view>
StringViews() const;
protected:
uint64_t* offsets_;
};
using JSONChunk = StringChunk;
class ArrayChunk : public Chunk {
public:
ArrayChunk(int32_t row_nums,
char* data,
size_t size,
milvus::DataType element_type)
: Chunk(row_nums, data, size), element_type_(element_type) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
lens_ = offsets_ + row_nums;
ConstructViews();
}
SpanBase
Span() const;
void
ConstructViews();
private:
milvus::DataType element_type_;
uint64_t* offsets_;
uint64_t* lens_;
std::vector<ArrayView> views_;
};
class SparseFloatVectorChunk : public Chunk {
public:
SparseFloatVectorChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
vec_.resize(row_nums);
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
auto offsets_ptr =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
for (int i = 0; i < row_nums; i++) {
int vec_size = 0;
if (i == row_nums - 1) {
vec_size = size - offsets_ptr[i];
} else {
vec_size = offsets_ptr[i + 1] - offsets_ptr[i];
}
vec_[i] = {
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data + offsets_ptr[i]),
false};
}
}
const char*
Data() const {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}
// only for test
std::vector<knowhere::sparse::SparseRow<float>>&
Vec() {
return vec_;
}
private:
std::vector<knowhere::sparse::SparseRow<float>> vec_;
};
} // namespace milvus

View File

@ -0,0 +1,36 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "common/ChunkFileWriter.h"
#include "common/ChunkWriter.h"
namespace milvus {
ChunkFileWriter::ChunkFileWriter(std::string& file_path)
: file_(File::Open(file_path, O_RDWR | O_CREAT | O_TRUNC)) {
}
void
ChunkFileWriter::write_chunk(std::shared_ptr<arrow::RecordBatchReader> r) {
// FIXME
size_t file_offset = file_.Seek(0, SEEK_END);
auto chunk = create_chunk(field_meta_, dim_, file_, file_offset, r);
// TODO: stat_writer_.write(chunk);
rep_.chunks.push_back(*chunk);
}
FileRep
ChunkFileWriter::finish() {
// TODO: stat_writer_.finish();
// rep_.stat_chunk = stat_writer_.get();
return rep_;
}
} // namespace milvus

View File

@ -0,0 +1,44 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <memory>
#include "arrow/record_batch.h"
#include "arrow/table_builder.h"
#include "common/Chunk.h"
#include "common/ChunkTarget.h"
#include "common/FieldMeta.h"
namespace milvus {
class StatisticsChunkWriter;
class StatisticsChunk;
class ChunkFileWriter {
public:
ChunkFileWriter() = default;
ChunkFileWriter(std::string& file_path);
struct FileRep {
std::vector<Chunk> chunks;
};
void
write_chunk(std::shared_ptr<arrow::RecordBatchReader> r);
FileRep
finish();
private:
FieldMeta& field_meta_;
int dim_;
StatisticsChunkWriter stat_writer_;
File file_;
FileRep rep_;
};
} // namespace milvus

View File

@ -0,0 +1,74 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <common/ChunkTarget.h>
#include <cstring>
#include "common/EasyAssert.h"
#include <sys/mman.h>
namespace milvus {
void
MemChunkTarget::write(const void* data, size_t size, bool append) {
AssertInfo(size + size_ <= cap_, "can not exceed target capacity");
std::memcpy(data_ + size_, data, size);
size_ += append ? size : 0;
}
void
MemChunkTarget::skip(size_t size) {
size_ += size;
}
void
MemChunkTarget::seek(size_t offset) {
size_ = offset;
}
std::pair<char*, size_t>
MemChunkTarget::get() {
return {data_, cap_};
}
size_t
MemChunkTarget::tell() {
return size_;
}
void
MmapChunkTarget::write(const void* data, size_t size, bool append) {
auto n = file_.Write(data, size);
AssertInfo(n != -1, "failed to write data to file");
size_ += append ? size : 0;
}
void
MmapChunkTarget::skip(size_t size) {
file_.Seek(size, SEEK_CUR);
size_ += size;
}
void
MmapChunkTarget::seek(size_t offset) {
file_.Seek(offset_ + offset, SEEK_SET);
}
std::pair<char*, size_t>
MmapChunkTarget::get() {
auto m = mmap(
nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_);
return {(char*)m, size_};
}
size_t
MmapChunkTarget::tell() {
return size_;
}
} // namespace milvus

View File

@ -0,0 +1,96 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <sys/mman.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstddef>
#include "common/File.h"
namespace milvus {
class ChunkTarget {
public:
virtual void
write(const void* data, size_t size, bool append = true) = 0;
virtual void
skip(size_t size) = 0;
virtual void
seek(size_t offset) = 0;
virtual std::pair<char*, size_t>
get() = 0;
virtual ~ChunkTarget() = default;
virtual size_t
tell() = 0;
};
class MmapChunkTarget : public ChunkTarget {
public:
MmapChunkTarget(File& file, size_t offset) : file_(file), offset_(offset) {
}
void
write(const void* data, size_t size, bool append = true) override;
void
skip(size_t size) override;
void
seek(size_t offset) override;
std::pair<char*, size_t>
get() override;
size_t
tell() override;
private:
File& file_;
size_t offset_ = 0;
size_t size_ = 0;
};
class MemChunkTarget : public ChunkTarget {
public:
MemChunkTarget(size_t cap) : cap_(cap) {
data_ = reinterpret_cast<char*>(mmap(nullptr,
cap,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANON,
-1,
0));
}
void
write(const void* data, size_t size, bool append = true) override;
void
skip(size_t size) override;
void
seek(size_t offset) override;
std::pair<char*, size_t>
get() override;
size_t
tell() override;
private:
char* data_; // no need to delete in destructor, will be deleted by Chunk
size_t cap_;
size_t size_ = 0;
};
} // namespace milvus

View File

@ -0,0 +1,362 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "common/ChunkWriter.h"
#include <cstdint>
#include <memory>
#include <string_view>
#include <vector>
#include "arrow/array/array_binary.h"
#include "arrow/array/array_primitive.h"
#include "arrow/record_batch.h"
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"
#include "common/VectorTrait.h"
#include "simdjson/common_defs.h"
#include "simdjson/padded_string.h"
namespace milvus {
void
StringChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<std::string_view> strs;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto array = std::dynamic_pointer_cast<arrow::StringArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
strs.push_back(str);
size += str.size();
}
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * row_nums_ + MMAP_STRING_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn, padding
// write null bitmaps
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
// write data
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_);
for (auto str : strs) {
offsets_.push_back(target_->tell());
target_->write(str.data(), str.size());
}
}
std::shared_ptr<Chunk>
StringChunkWriter::finish() {
// write padding, maybe not needed anymore
// FIXME
char padding[MMAP_STRING_PADDING];
target_->write(padding, MMAP_STRING_PADDING);
// seek back to write offsets
target_->seek(offsets_pos_);
target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t));
auto [data, size] = target_->get();
return std::make_shared<StringChunk>(row_nums_, data, size);
}
void
JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<Json> jsons;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
auto json = Json(simdjson::padded_string(str));
size += json.data().size();
jsons.push_back(std::move(json));
}
AssertInfo(data->length() % 8 == 0,
"String length should be multiple of 8");
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * row_nums_ + simdjson::SIMDJSON_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: null bitmaps, offset1, offset2, ... ,json1, json2, ..., jsonn
// write null bitmaps
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_);
// write data
for (auto json : jsons) {
offsets_.push_back(target_->tell());
target_->write(json.data().data(), json.data().size());
}
}
std::shared_ptr<Chunk>
JSONChunkWriter::finish() {
char padding[simdjson::SIMDJSON_PADDING];
target_->write(padding, simdjson::SIMDJSON_PADDING);
// write offsets and padding
target_->seek(offsets_pos_);
target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t));
auto [data, size] = target_->get();
return std::make_shared<JSONChunk>(row_nums_, data, size);
}
void
ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<Array> arrays;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
ScalarArray scalar_array;
scalar_array.ParseFromArray(str.data(), str.size());
auto arr = Array(scalar_array);
size += arr.byte_size();
arrays.push_back(std::move(arr));
// element offsets size
size += sizeof(uint64_t) * arr.length();
}
row_nums_ += array->length();
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
}
auto is_string = IsStringDataType(element_type_);
// offsets + lens
size += is_string ? sizeof(uint64_t) * row_nums_ * 2 + MMAP_ARRAY_PADDING
: sizeof(uint64_t) * row_nums_ + MMAP_ARRAY_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: nullbitmaps, offsets, elem_off1, elem_off2, .. data1, data2, ..., datan, padding
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_ * 2);
for (auto& arr : arrays) {
// write elements offsets
offsets_.push_back(target_->tell());
if (is_string) {
target_->write(arr.get_offsets().data(),
arr.get_offsets().size() * sizeof(uint64_t));
}
lens_.push_back(arr.length());
target_->write(arr.data(), arr.byte_size());
}
}
std::shared_ptr<Chunk>
ArrayChunkWriter::finish() {
char padding[MMAP_ARRAY_PADDING];
target_->write(padding, MMAP_ARRAY_PADDING);
// write offsets and lens
target_->seek(offsets_pos_);
for (size_t i = 0; i < offsets_.size(); i++) {
target_->write(&offsets_[i], sizeof(uint64_t));
target_->write(&lens_[i], sizeof(uint64_t));
}
auto [data, size] = target_->get();
return std::make_shared<ArrayChunk>(row_nums_, data, size, element_type_);
}
void
SparseFloatVectorChunkWriter::write(
std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<std::string_view> strs;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
strs.push_back(str);
size += str.size();
}
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * row_nums_;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn
// write null bitmaps
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
// write data
offsets_pos_ = target_->tell();
target_->skip(sizeof(uint64_t) * row_nums_);
for (auto str : strs) {
offsets_.push_back(target_->tell());
target_->write(str.data(), str.size());
}
}
std::shared_ptr<Chunk>
SparseFloatVectorChunkWriter::finish() {
// seek back to write offsets
target_->seek(offsets_pos_);
target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t));
auto [data, size] = target_->get();
return std::make_shared<SparseFloatVectorChunk>(row_nums_, data, size);
}
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
std::shared_ptr<arrow::RecordBatchReader> r) {
std::shared_ptr<ChunkWriterBase> w;
switch (field_meta.get_data_type()) {
case milvus::DataType::BOOL: {
w = std::make_shared<ChunkWriter<arrow::BooleanArray, bool>>(dim);
break;
}
case milvus::DataType::INT8: {
w = std::make_shared<ChunkWriter<arrow::Int8Array, int8_t>>(dim);
break;
}
case milvus::DataType::INT16: {
w = std::make_shared<ChunkWriter<arrow::Int16Array, int16_t>>(dim);
break;
}
case milvus::DataType::INT32: {
w = std::make_shared<ChunkWriter<arrow::Int32Array, int32_t>>(dim);
break;
}
case milvus::DataType::INT64: {
w = std::make_shared<ChunkWriter<arrow::Int64Array, int64_t>>(dim);
break;
}
case milvus::DataType::FLOAT: {
w = std::make_shared<ChunkWriter<arrow::FloatArray, float>>(dim);
break;
}
case milvus::DataType::DOUBLE: {
w = std::make_shared<ChunkWriter<arrow::DoubleArray, double>>(dim);
break;
}
case milvus::DataType::VECTOR_FLOAT: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, float>>(dim);
break;
}
case milvus::DataType::VECTOR_BINARY: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, uint8_t>>(dim / 8);
break;
}
case milvus::DataType::VECTOR_FLOAT16: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp16>>(dim);
break;
}
case milvus::DataType::VECTOR_BFLOAT16: {
w = std::make_shared<
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bf16>>(dim);
break;
}
case milvus::DataType::VARCHAR:
case milvus::DataType::STRING: {
w = std::make_shared<StringChunkWriter>();
break;
}
case milvus::DataType::JSON: {
w = std::make_shared<JSONChunkWriter>();
break;
}
case milvus::DataType::ARRAY: {
w = std::make_shared<ArrayChunkWriter>(
field_meta.get_element_type());
break;
}
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
w = std::make_shared<SparseFloatVectorChunkWriter>();
break;
}
default:
PanicInfo(Unsupported, "Unsupported data type");
}
w->write(r);
return w->finish();
}
} // namespace milvus

View File

@ -0,0 +1,239 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <numeric>
#include <vector>
#include "arrow/array/array_primitive.h"
#include "common/ChunkTarget.h"
#include "arrow/record_batch.h"
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
namespace milvus {
class ChunkWriterBase {
public:
ChunkWriterBase() = default;
ChunkWriterBase(File& file, size_t offset)
: file_(&file), file_offset_(offset) {
}
virtual void
write(std::shared_ptr<arrow::RecordBatchReader> data) = 0;
virtual std::shared_ptr<Chunk>
finish() = 0;
std::pair<char*, size_t>
get_data() {
return target_->get();
}
protected:
int row_nums_ = 0;
File* file_ = nullptr;
size_t file_offset_ = 0;
std::shared_ptr<ChunkTarget> target_;
};
template <typename ArrowType, typename T>
class ChunkWriter : public ChunkWriterBase {
public:
ChunkWriter(int dim) : dim_(dim) {
}
ChunkWriter(int dim, File& file, size_t offset)
: ChunkWriterBase(file, offset), dim_(dim){};
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override {
auto size = 0;
auto row_nums = 0;
auto batch_vec = data->ToRecordBatches().ValueOrDie();
for (auto batch : batch_vec) {
row_nums += batch->num_rows();
auto data = batch->column(0);
auto array = std::dynamic_pointer_cast<ArrowType>(data);
auto null_bitmap_n = (data->length() + 7) / 8;
size += null_bitmap_n + array->length() * dim_ * sizeof(T);
}
row_nums_ = row_nums;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: nullbitmap, data1, data2, ..., datan
for (auto batch : batch_vec) {
auto data = batch->column(0);
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
target_->write(null_bitmap, null_bitmap_n);
} else {
std::vector<uint8_t> null_bitmap(null_bitmap_n, 0xff);
target_->write(null_bitmap.data(), null_bitmap_n);
}
}
for (auto batch : batch_vec) {
auto data = batch->column(0);
auto array = std::dynamic_pointer_cast<ArrowType>(data);
auto data_ptr = array->raw_values();
target_->write(data_ptr, array->length() * dim_ * sizeof(T));
}
}
std::shared_ptr<Chunk>
finish() override {
auto [data, size] = target_->get();
return std::make_shared<FixedWidthChunk<T>>(
row_nums_, dim_, data, size);
}
private:
int dim_;
};
template <>
inline void
ChunkWriter<arrow::BooleanArray, bool>::write(
std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
auto row_nums = 0;
auto batch_vec = data->ToRecordBatches().ValueOrDie();
for (auto batch : batch_vec) {
row_nums += batch->num_rows();
auto data = batch->column(0);
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
size += array->length() * dim_;
size += (data->length() + 7) / 8;
}
row_nums_ = row_nums;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: nullbitmap, data1, data2, ..., datan
for (auto batch : batch_vec) {
auto data = batch->column(0);
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
target_->write(null_bitmap, null_bitmap_n);
} else {
std::vector<uint8_t> null_bitmap(null_bitmap_n, 0xff);
target_->write(null_bitmap.data(), null_bitmap_n);
}
}
for (auto batch : batch_vec) {
auto data = batch->column(0);
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
for (int i = 0; i < array->length(); i++) {
auto value = array->Value(i);
target_->write(&value, sizeof(bool));
}
}
}
class StringChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
std::shared_ptr<Chunk>
finish() override;
protected:
std::vector<int64_t> offsets_;
size_t offsets_pos_ = 0;
};
class JSONChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
std::shared_ptr<Chunk>
finish() override;
private:
std::vector<int64_t> offsets_;
size_t offsets_pos_ = 0;
};
class ArrayChunkWriter : public ChunkWriterBase {
public:
ArrayChunkWriter(const milvus::DataType element_type)
: element_type_(element_type) {
}
ArrayChunkWriter(const milvus::DataType element_type,
File& file,
size_t offset)
: ChunkWriterBase(file, offset), element_type_(element_type) {
}
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
std::shared_ptr<Chunk>
finish() override;
private:
const milvus::DataType element_type_;
std::vector<uint64_t> offsets_;
std::vector<uint64_t> lens_;
size_t offsets_pos_;
};
class SparseFloatVectorChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
std::shared_ptr<Chunk>
finish() override;
private:
uint64_t offsets_pos_ = 0;
std::vector<uint64_t> offsets_;
};
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
std::shared_ptr<arrow::RecordBatchReader> r);
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
File& file,
size_t file_offset,
std::shared_ptr<arrow::RecordBatchReader> r);
} // namespace milvus

View File

@ -72,6 +72,7 @@ set(MILVUS_TEST_FILES
test_chunk_vector.cpp
test_mmap_chunk_manager.cpp
test_monitor.cpp
test_chunk.cpp
)
if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -0,0 +1,186 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <parquet/arrow/reader.h>
#include <memory>
#include "common/Chunk.h"
#include "common/ChunkWriter.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
#include "storage/Event.h"
#include "storage/Util.h"
#include "test_utils/Constants.h"
#include "test_utils/DataGen.h"
using namespace milvus;
TEST(chunk, test_int64_field) {
FixedVector<int64_t> data = {1, 2, 3, 4, 5};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::INT64);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::INT64, false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto span =
std::dynamic_pointer_cast<FixedWidthChunk<int64_t>>(chunk)->Span();
EXPECT_EQ(span.row_count(), data.size());
for (size_t i = 0; i < data.size(); ++i) {
auto n = *(int64_t*)((char*)span.data() + i * span.element_sizeof());
EXPECT_EQ(n, data[i]);
}
}
TEST(chunk, test_variable_field) {
FixedVector<std::string> data = {
"test1", "test2", "test3", "test4", "test5"};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::VARCHAR);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::STRING, false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews();
for (size_t i = 0; i < data.size(); ++i) {
EXPECT_EQ(views[i], data[i]);
}
}
TEST(chunk, test_array) {
milvus::proto::schema::ScalarField field_string_data;
field_string_data.mutable_string_data()->add_data("test_array1");
field_string_data.mutable_string_data()->add_data("test_array2");
field_string_data.mutable_string_data()->add_data("test_array3");
field_string_data.mutable_string_data()->add_data("test_array4");
field_string_data.mutable_string_data()->add_data("test_array5");
auto string_array = Array(field_string_data);
FixedVector<Array> data = {string_array};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::ARRAY);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(FieldName("a"),
milvus::FieldId(1),
DataType::ARRAY,
DataType::STRING,
false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto span = std::dynamic_pointer_cast<ArrayChunk>(chunk)->Span();
EXPECT_EQ(span.row_count(), 1);
auto arr = *(ArrayView*)span.data();
for (size_t i = 0; i < arr.length(); ++i) {
auto str = arr.get_data<std::string>(i);
EXPECT_EQ(str, field_string_data.string_data().data(i));
}
}
TEST(chunk, test_sparse_float) {
auto n_rows = 100;
auto vecs = milvus::segcore::GenerateRandomSparseFloatVector(
n_rows, kTestSparseDim, kTestSparseVectorDensity);
auto field_data = milvus::storage::CreateFieldData(
storage::DataType::VECTOR_SPARSE_FLOAT, false, kTestSparseDim, n_rows);
field_data->FillFieldData(vecs.get(), n_rows);
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(FieldName("a"),
milvus::FieldId(1),
DataType::VECTOR_SPARSE_FLOAT,
kTestSparseDim,
"IP",
false);
auto chunk = create_chunk(field_meta, kTestSparseDim, rb_reader);
auto vec = std::dynamic_pointer_cast<SparseFloatVectorChunk>(chunk)->Vec();
for (size_t i = 0; i < n_rows; ++i) {
auto v1 = vec[i];
auto v2 = vecs[i];
EXPECT_EQ(v1.size(), v2.size());
for (size_t j = 0; j < v1.size(); ++j) {
EXPECT_EQ(v1[j].val, v2[j].val);
}
}
}