mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
cherry-pick commit from master: pr: #33875 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
f5a0353fd1
commit
173c02902e
@ -46,4 +46,12 @@ SetCpuNum(const int core);
|
||||
void
|
||||
SetDefaultExecEvalExprBatchSize(int64_t val);
|
||||
|
||||
struct BufferView {
|
||||
char* data_;
|
||||
size_t size_;
|
||||
|
||||
BufferView(char* data_ptr, size_t size) : data_(data_ptr), size_(size) {
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace milvus
|
||||
|
@ -67,6 +67,7 @@ enum ErrorCode {
|
||||
// timeout or cancel related.
|
||||
FollyOtherException = 2037,
|
||||
FollyCancel = 2038,
|
||||
OutOfRange = 2037,
|
||||
KnowhereError = 2100,
|
||||
};
|
||||
namespace impl {
|
||||
|
@ -39,7 +39,7 @@ class File {
|
||||
"failed to create mmap file {}: {}",
|
||||
filepath,
|
||||
strerror(errno));
|
||||
return File(fd);
|
||||
return File(fd, std::string(filepath));
|
||||
}
|
||||
|
||||
int
|
||||
@ -47,11 +47,22 @@ class File {
|
||||
return fd_;
|
||||
}
|
||||
|
||||
std::string
|
||||
Path() const {
|
||||
return filepath_;
|
||||
}
|
||||
|
||||
ssize_t
|
||||
Write(const void* buf, size_t size) {
|
||||
return write(fd_, buf, size);
|
||||
}
|
||||
|
||||
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
|
||||
ssize_t
|
||||
WriteInt(T value) {
|
||||
return write(fd_, &value, sizeof(value));
|
||||
}
|
||||
|
||||
offset_t
|
||||
Seek(offset_t offset, int whence) {
|
||||
return lseek(fd_, offset, whence);
|
||||
@ -64,8 +75,10 @@ class File {
|
||||
}
|
||||
|
||||
private:
|
||||
explicit File(int fd) : fd_(fd) {
|
||||
explicit File(int fd, const std::string& filepath)
|
||||
: fd_(fd), filepath_(filepath) {
|
||||
}
|
||||
int fd_{-1};
|
||||
std::string filepath_;
|
||||
};
|
||||
} // namespace milvus
|
||||
|
@ -77,7 +77,7 @@ PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
|
||||
return [chunk_data](int i) -> const number { return chunk_data[i]; };
|
||||
} else {
|
||||
auto chunk_data =
|
||||
segment_->chunk_data<std::string_view>(field_id, chunk_id).data();
|
||||
segment_->chunk_view<std::string_view>(field_id, chunk_id).data();
|
||||
return [chunk_data](int i) -> const number {
|
||||
return std::string(chunk_data[i]);
|
||||
};
|
||||
|
@ -26,7 +26,7 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
|
||||
case DataType::JSON: {
|
||||
if (is_index_mode_) {
|
||||
PanicInfo(ExprInvalid,
|
||||
"exists expr for json index mode not supportted");
|
||||
"exists expr for json index mode not supported");
|
||||
}
|
||||
result = EvalJsonExistsForDataSegment();
|
||||
break;
|
||||
|
@ -189,6 +189,32 @@ class SegmentExpr : public Expr {
|
||||
: batch_size_;
|
||||
}
|
||||
|
||||
// used for processing raw data expr for sealed segments.
|
||||
// now only used for std::string_view && json
|
||||
// TODO: support more types
|
||||
template <typename T, typename FUNC, typename... ValTypes>
|
||||
int64_t
|
||||
ProcessChunkForSealedSeg(
|
||||
FUNC func,
|
||||
std::function<bool(const milvus::SkipIndex&, FieldId, int)> skip_func,
|
||||
TargetBitmapView res,
|
||||
ValTypes... values) {
|
||||
// For sealed segment, only single chunk
|
||||
Assert(num_data_chunk_ == 1);
|
||||
auto need_size =
|
||||
std::min(active_count_ - current_data_chunk_pos_, batch_size_);
|
||||
|
||||
auto& skip_index = segment_->GetSkipIndex();
|
||||
if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
|
||||
auto data_vec = segment_->get_batch_views<T>(
|
||||
field_id_, 0, current_data_chunk_pos_, need_size);
|
||||
|
||||
func(data_vec.data(), need_size, res, values...);
|
||||
}
|
||||
current_data_chunk_pos_ += need_size;
|
||||
return need_size;
|
||||
}
|
||||
|
||||
template <typename T, typename FUNC, typename... ValTypes>
|
||||
int64_t
|
||||
ProcessDataChunks(
|
||||
@ -197,6 +223,15 @@ class SegmentExpr : public Expr {
|
||||
TargetBitmapView res,
|
||||
ValTypes... values) {
|
||||
int64_t processed_size = 0;
|
||||
|
||||
if constexpr (std::is_same_v<T, std::string_view> ||
|
||||
std::is_same_v<T, Json>) {
|
||||
if (segment_->type() == SegmentType::Sealed) {
|
||||
return ProcessChunkForSealedSeg<T>(
|
||||
func, skip_func, res, values...);
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
|
||||
auto data_pos =
|
||||
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
|
||||
@ -431,4 +466,4 @@ class ExprSet {
|
||||
};
|
||||
|
||||
} //namespace exec
|
||||
} // namespace milvus
|
||||
} // namespace milvus
|
||||
|
@ -35,7 +35,7 @@ PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
|
||||
if (is_index_mode_) {
|
||||
PanicInfo(
|
||||
ExprInvalid,
|
||||
"exists expr for json or array index mode not supportted");
|
||||
"exists expr for json or array index mode not supported");
|
||||
}
|
||||
result = EvalJsonContainsForDataSegment();
|
||||
break;
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "common/Array.h"
|
||||
#include "common/Common.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/File.h"
|
||||
#include "common/FieldMeta.h"
|
||||
@ -52,6 +53,8 @@ namespace milvus {
|
||||
constexpr size_t STRING_PADDING = 1;
|
||||
constexpr size_t ARRAY_PADDING = 1;
|
||||
|
||||
constexpr size_t BLOCK_SIZE = 8192;
|
||||
|
||||
class ColumnBase {
|
||||
public:
|
||||
enum MappingType {
|
||||
@ -223,6 +226,19 @@ class ColumnBase {
|
||||
virtual SpanBase
|
||||
Span() const = 0;
|
||||
|
||||
// used for sequential access for search
|
||||
virtual BufferView
|
||||
GetBatchBuffer(int64_t start_offset, int64_t length) {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"GetBatchBuffer only supported for VariableColumn");
|
||||
}
|
||||
|
||||
virtual std::vector<std::string_view>
|
||||
StringViews() const {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"StringViews only supported for VariableColumn");
|
||||
}
|
||||
|
||||
virtual void
|
||||
AppendBatch(const FieldDataPtr data) {
|
||||
size_t required_size = size_ + data->Size();
|
||||
@ -557,40 +573,94 @@ class VariableColumn : public ColumnBase {
|
||||
}
|
||||
|
||||
VariableColumn(VariableColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)),
|
||||
indices_(std::move(column.indices_)),
|
||||
views_(std::move(column.views_)) {
|
||||
: ColumnBase(std::move(column)), indices_(std::move(column.indices_)) {
|
||||
}
|
||||
|
||||
~VariableColumn() override = default;
|
||||
|
||||
SpanBase
|
||||
Span() const override {
|
||||
return SpanBase(views_.data(), views_.size(), sizeof(ViewType));
|
||||
PanicInfo(ErrorCode::NotImplemented,
|
||||
"span() interface is not implemented for variable column");
|
||||
}
|
||||
|
||||
[[nodiscard]] const std::vector<ViewType>&
|
||||
std::vector<std::string_view>
|
||||
StringViews() const override {
|
||||
std::vector<std::string_view> res;
|
||||
char* pos = data_;
|
||||
for (size_t i = 0; i < num_rows_; ++i) {
|
||||
uint32_t size;
|
||||
size = *reinterpret_cast<uint32_t*>(pos);
|
||||
pos += sizeof(uint32_t);
|
||||
res.emplace_back(std::string_view(pos, size));
|
||||
pos += size;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
[[nodiscard]] std::vector<ViewType>
|
||||
Views() const {
|
||||
return views_;
|
||||
std::vector<ViewType> res;
|
||||
char* pos = data_;
|
||||
for (size_t i = 0; i < num_rows_; ++i) {
|
||||
uint32_t size;
|
||||
size = *reinterpret_cast<uint32_t*>(pos);
|
||||
pos += sizeof(uint32_t);
|
||||
res.emplace_back(ViewType(pos, size));
|
||||
pos += size;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
BufferView
|
||||
GetBatchBuffer(int64_t start_offset, int64_t length) override {
|
||||
if (start_offset < 0 || start_offset > num_rows_ ||
|
||||
start_offset + length > num_rows_) {
|
||||
PanicInfo(ErrorCode::OutOfRange, "index out of range");
|
||||
}
|
||||
|
||||
char* pos = data_ + indices_[start_offset / BLOCK_SIZE];
|
||||
for (size_t j = 0; j < start_offset % BLOCK_SIZE; j++) {
|
||||
uint32_t size;
|
||||
size = *reinterpret_cast<uint32_t*>(pos);
|
||||
pos += sizeof(uint32_t) + size;
|
||||
}
|
||||
|
||||
return BufferView{pos, size_ - (pos - data_)};
|
||||
}
|
||||
|
||||
ViewType
|
||||
operator[](const int i) const {
|
||||
return views_[i];
|
||||
if (i < 0 || i > num_rows_) {
|
||||
PanicInfo(ErrorCode::OutOfRange, "index out of range");
|
||||
}
|
||||
size_t batch_id = i / BLOCK_SIZE;
|
||||
size_t offset = i % BLOCK_SIZE;
|
||||
|
||||
// located in batch start location
|
||||
char* pos = data_ + indices_[batch_id];
|
||||
for (size_t j = 0; j < offset; j++) {
|
||||
uint32_t size;
|
||||
size = *reinterpret_cast<uint32_t*>(pos);
|
||||
pos += sizeof(uint32_t) + size;
|
||||
}
|
||||
|
||||
uint32_t size;
|
||||
size = *reinterpret_cast<uint32_t*>(pos);
|
||||
return ViewType(pos + sizeof(uint32_t), size);
|
||||
}
|
||||
|
||||
std::string_view
|
||||
RawAt(const int i) const {
|
||||
return std::string_view(views_[i]);
|
||||
return std::string_view((*this)[i]);
|
||||
}
|
||||
|
||||
void
|
||||
Append(FieldDataPtr chunk) {
|
||||
for (auto i = 0; i < chunk->get_num_rows(); i++) {
|
||||
auto data = static_cast<const T*>(chunk->RawValue(i));
|
||||
|
||||
indices_.emplace_back(size_);
|
||||
size_ += data->size();
|
||||
auto data = static_cast<const T*>(chunk->RawValue(i));
|
||||
size_ += sizeof(uint32_t) + data->size();
|
||||
}
|
||||
load_buf_.emplace(std::move(chunk));
|
||||
}
|
||||
@ -613,40 +683,42 @@ class VariableColumn : public ColumnBase {
|
||||
auto chunk = std::move(load_buf_.front());
|
||||
load_buf_.pop();
|
||||
|
||||
// data_ as: |size|data|size|data......
|
||||
for (auto i = 0; i < chunk->get_num_rows(); i++) {
|
||||
auto current_size = (uint32_t)chunk->Size(i);
|
||||
std::memcpy(data_ + size_, ¤t_size, sizeof(uint32_t));
|
||||
size_ += sizeof(uint32_t);
|
||||
auto data = static_cast<const T*>(chunk->RawValue(i));
|
||||
std::copy_n(data->c_str(), data->size(), data_ + size_);
|
||||
std::memcpy(data_ + size_, data->c_str(), data->size());
|
||||
size_ += data->size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ConstructViews();
|
||||
|
||||
// Not need indices_ after
|
||||
indices_.clear();
|
||||
std::vector<uint64_t>().swap(indices_);
|
||||
shrink_indice();
|
||||
}
|
||||
|
||||
protected:
|
||||
void
|
||||
ConstructViews() {
|
||||
views_.reserve(indices_.size());
|
||||
for (size_t i = 0; i < indices_.size() - 1; i++) {
|
||||
views_.emplace_back(data_ + indices_[i],
|
||||
indices_[i + 1] - indices_[i]);
|
||||
shrink_indice() {
|
||||
std::vector<uint64_t> tmp_indices;
|
||||
tmp_indices.reserve((indices_.size() + BLOCK_SIZE - 1) / BLOCK_SIZE);
|
||||
|
||||
for (size_t i = 0; i < indices_.size();) {
|
||||
tmp_indices.push_back(indices_[i]);
|
||||
i += BLOCK_SIZE;
|
||||
}
|
||||
views_.emplace_back(data_ + indices_.back(), size_ - indices_.back());
|
||||
|
||||
indices_.swap(tmp_indices);
|
||||
}
|
||||
|
||||
private:
|
||||
// loading states
|
||||
std::queue<FieldDataPtr> load_buf_{};
|
||||
|
||||
// raw data index, record indices located 0, interval, 2 * interval, 3 * interval
|
||||
// ... just like page index, interval set to 8192 that matches search engine's batch size
|
||||
std::vector<uint64_t> indices_{};
|
||||
|
||||
// Compatible with current Span type
|
||||
std::vector<ViewType> views_{};
|
||||
};
|
||||
|
||||
class ArrayColumn : public ColumnBase {
|
||||
|
@ -33,47 +33,72 @@
|
||||
|
||||
namespace milvus {
|
||||
|
||||
inline size_t
|
||||
#define THROW_FILE_WRITE_ERROR \
|
||||
PanicInfo(ErrorCode::FileWriteFailed, \
|
||||
fmt::format("write data to file {} failed, error code {}", \
|
||||
file.Path(), \
|
||||
strerror(errno)));
|
||||
|
||||
inline void
|
||||
WriteFieldData(File& file,
|
||||
DataType data_type,
|
||||
const FieldDataPtr& data,
|
||||
uint64_t& total_written,
|
||||
std::vector<uint64_t>& indices,
|
||||
std::vector<std::vector<uint64_t>>& element_indices) {
|
||||
size_t total_written{0};
|
||||
if (IsVariableDataType(data_type)) {
|
||||
switch (data_type) {
|
||||
case DataType::VARCHAR:
|
||||
case DataType::STRING: {
|
||||
// write as: |size|data|size|data......
|
||||
for (auto i = 0; i < data->get_num_rows(); ++i) {
|
||||
indices.push_back(total_written);
|
||||
auto str =
|
||||
static_cast<const std::string*>(data->RawValue(i));
|
||||
ssize_t written = file.Write(str->data(), str->size());
|
||||
if (written < str->size()) {
|
||||
break;
|
||||
ssize_t written_data_size =
|
||||
file.WriteInt<uint32_t>(uint32_t(str->size()));
|
||||
if (written_data_size != sizeof(uint32_t)) {
|
||||
THROW_FILE_WRITE_ERROR
|
||||
}
|
||||
total_written += written;
|
||||
total_written += written_data_size;
|
||||
auto written_data = file.Write(str->data(), str->size());
|
||||
if (written_data < str->size()) {
|
||||
THROW_FILE_WRITE_ERROR
|
||||
}
|
||||
total_written += written_data;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case DataType::JSON: {
|
||||
// write as: |size|data|size|data......
|
||||
for (ssize_t i = 0; i < data->get_num_rows(); ++i) {
|
||||
indices.push_back(total_written);
|
||||
auto padded_string =
|
||||
static_cast<const Json*>(data->RawValue(i))->data();
|
||||
ssize_t written =
|
||||
file.Write(padded_string.data(), padded_string.size());
|
||||
if (written < padded_string.size()) {
|
||||
break;
|
||||
ssize_t written_data_size =
|
||||
file.WriteInt<uint32_t>(uint32_t(padded_string.size()));
|
||||
if (written_data_size != sizeof(uint32_t)) {
|
||||
THROW_FILE_WRITE_ERROR
|
||||
}
|
||||
total_written += written;
|
||||
total_written += written_data_size;
|
||||
ssize_t written_data =
|
||||
file.Write(padded_string.data(), padded_string.size());
|
||||
if (written_data < padded_string.size()) {
|
||||
THROW_FILE_WRITE_ERROR
|
||||
}
|
||||
total_written += written_data;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case DataType::ARRAY: {
|
||||
// write as: |data|data|data|data|data......
|
||||
for (size_t i = 0; i < data->get_num_rows(); ++i) {
|
||||
indices.push_back(total_written);
|
||||
auto array = static_cast<const Array*>(data->RawValue(i));
|
||||
ssize_t written =
|
||||
file.Write(array->data(), array->byte_size());
|
||||
if (written < array->byte_size()) {
|
||||
break;
|
||||
THROW_FILE_WRITE_ERROR
|
||||
}
|
||||
element_indices.emplace_back(array->get_offsets());
|
||||
total_written += written;
|
||||
@ -93,9 +118,15 @@ WriteFieldData(File& file,
|
||||
GetDataTypeName(data_type));
|
||||
}
|
||||
} else {
|
||||
total_written += file.Write(data->Data(), data->Size());
|
||||
// write as: data|data|data|data|data|data......
|
||||
size_t written = file.Write(data->Data(), data->Size());
|
||||
if (written < data->Size()) {
|
||||
THROW_FILE_WRITE_ERROR
|
||||
}
|
||||
for (auto i = 0; i < data->get_num_rows(); i++) {
|
||||
indices.emplace_back(total_written);
|
||||
total_written += data->Size(i);
|
||||
}
|
||||
}
|
||||
|
||||
return total_written;
|
||||
}
|
||||
} // namespace milvus
|
||||
|
@ -58,7 +58,7 @@ template <typename T>
|
||||
class SealedDataGetter : public DataGetter<T> {
|
||||
private:
|
||||
std::shared_ptr<Span<T>> field_data_;
|
||||
std::shared_ptr<Span<std::string_view>> str_field_data_;
|
||||
std::shared_ptr<std::vector<std::string_view>> str_field_data_;
|
||||
const index::ScalarIndex<T>* field_index_;
|
||||
|
||||
public:
|
||||
@ -66,9 +66,9 @@ class SealedDataGetter : public DataGetter<T> {
|
||||
FieldId& field_id) {
|
||||
if (segment.HasFieldData(field_id)) {
|
||||
if constexpr (std::is_same_v<T, std::string>) {
|
||||
auto span = segment.chunk_data<std::string_view>(field_id, 0);
|
||||
str_field_data_ = std::make_shared<Span<std::string_view>>(
|
||||
span.data(), span.row_count());
|
||||
str_field_data_ =
|
||||
std::make_shared<std::vector<std::string_view>>(
|
||||
segment.chunk_view<std::string_view>(field_id, 0));
|
||||
} else {
|
||||
auto span = segment.chunk_data<T>(field_id, 0);
|
||||
field_data_ =
|
||||
|
@ -421,6 +421,12 @@ SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
||||
return vec->get_span_base(chunk_id);
|
||||
}
|
||||
|
||||
std::vector<std::string_view>
|
||||
SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
|
||||
PanicInfo(ErrorCode::NotImplemented,
|
||||
"chunk view impl not implement for growing segment");
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentGrowingImpl::num_chunk() const {
|
||||
auto size = get_insert_record().ack_responder_.GetAck();
|
||||
|
@ -299,6 +299,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
limit, bitset, false_filtered_out);
|
||||
}
|
||||
|
||||
bool
|
||||
is_mmap_field(FieldId id) const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected:
|
||||
int64_t
|
||||
num_chunk() const override;
|
||||
@ -306,6 +311,19 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
SpanBase
|
||||
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||
|
||||
std::vector<std::string_view>
|
||||
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||
|
||||
BufferView
|
||||
get_chunk_buffer(FieldId field_id,
|
||||
int64_t chunk_id,
|
||||
int64_t start_offset,
|
||||
int64_t length) const override {
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"get_chunk_buffer interface not supported for growing segment");
|
||||
}
|
||||
|
||||
void
|
||||
check_search(const query::Plan* plan) const override {
|
||||
Assert(plan);
|
||||
|
@ -138,6 +138,47 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
return static_cast<Span<T>>(chunk_data_impl(field_id, chunk_id));
|
||||
}
|
||||
|
||||
template <typename ViewType>
|
||||
std::vector<ViewType>
|
||||
chunk_view(FieldId field_id, int64_t chunk_id) const {
|
||||
auto string_views = chunk_view_impl(field_id, chunk_id);
|
||||
if constexpr (std::is_same_v<ViewType, std::string_view>) {
|
||||
return std::move(string_views);
|
||||
} else {
|
||||
std::vector<ViewType> res;
|
||||
res.reserve(string_views.size());
|
||||
for (const auto& view : string_views) {
|
||||
res.emplace_back(view);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename ViewType>
|
||||
std::vector<ViewType>
|
||||
get_batch_views(FieldId field_id,
|
||||
int64_t chunk_id,
|
||||
int64_t start_offset,
|
||||
int64_t length) const {
|
||||
if (this->type() == SegmentType::Growing) {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"get chunk views not supported for growing segment");
|
||||
}
|
||||
BufferView buffer =
|
||||
get_chunk_buffer(field_id, chunk_id, start_offset, length);
|
||||
std::vector<ViewType> res;
|
||||
res.reserve(length);
|
||||
char* pos = buffer.data_;
|
||||
for (size_t j = 0; j < length; j++) {
|
||||
uint32_t size;
|
||||
size = *reinterpret_cast<uint32_t*>(pos);
|
||||
pos += sizeof(uint32_t);
|
||||
res.emplace_back(ViewType(pos, size));
|
||||
pos += size;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
const index::ScalarIndex<T>&
|
||||
chunk_scalar_index(FieldId field_id, int64_t chunk_id) const {
|
||||
@ -306,11 +347,26 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
bool ignore_non_pk,
|
||||
bool fill_ids) const;
|
||||
|
||||
// return whether field mmap or not
|
||||
virtual bool
|
||||
is_mmap_field(FieldId field_id) const = 0;
|
||||
|
||||
protected:
|
||||
// internal API: return chunk_data in span
|
||||
virtual SpanBase
|
||||
chunk_data_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
||||
|
||||
// internal API: return chunk string views in vector
|
||||
virtual std::vector<std::string_view>
|
||||
chunk_view_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
||||
|
||||
// internal API: return buffer reference to field chunk data located from start_offset
|
||||
virtual BufferView
|
||||
get_chunk_buffer(FieldId field_id,
|
||||
int64_t chunk_id,
|
||||
int64_t start_offset,
|
||||
int64_t length) const = 0;
|
||||
|
||||
// internal API: return chunk_index in span, support scalar index only
|
||||
virtual const index::IndexBase*
|
||||
chunk_index_impl(FieldId field_id, int64_t chunk_id) const = 0;
|
||||
|
@ -526,27 +526,17 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
|
||||
auto data_type = field_meta.get_data_type();
|
||||
|
||||
// write the field data to disk
|
||||
FieldDataPtr field_data;
|
||||
uint64_t total_written = 0;
|
||||
std::vector<uint64_t> indices{};
|
||||
std::vector<std::vector<uint64_t>> element_indices{};
|
||||
FieldDataPtr field_data;
|
||||
size_t total_written = 0;
|
||||
while (data.channel->pop(field_data)) {
|
||||
auto written =
|
||||
WriteFieldData(file, data_type, field_data, element_indices);
|
||||
|
||||
AssertInfo(written == field_data->Size(),
|
||||
fmt::format("failed to write data file {}, written {} but "
|
||||
"total {}, err: {}",
|
||||
filepath.c_str(),
|
||||
written,
|
||||
field_data->Size(),
|
||||
strerror(errno)));
|
||||
|
||||
for (auto i = 0; i < field_data->get_num_rows(); i++) {
|
||||
auto size = field_data->Size(i);
|
||||
indices.emplace_back(total_written);
|
||||
total_written += size;
|
||||
}
|
||||
WriteFieldData(file,
|
||||
data_type,
|
||||
field_data,
|
||||
total_written,
|
||||
indices,
|
||||
element_indices);
|
||||
}
|
||||
|
||||
auto num_rows = data.row_count;
|
||||
@ -596,6 +586,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
|
||||
{
|
||||
std::unique_lock lck(mutex_);
|
||||
fields_.emplace(field_id, column);
|
||||
mmap_fields_.insert(field_id);
|
||||
}
|
||||
|
||||
auto ok = unlink(filepath.c_str());
|
||||
@ -667,6 +658,29 @@ SegmentSealedImpl::size_per_chunk() const {
|
||||
return get_row_count();
|
||||
}
|
||||
|
||||
BufferView
|
||||
SegmentSealedImpl::get_chunk_buffer(FieldId field_id,
|
||||
int64_t chunk_id,
|
||||
int64_t start_offset,
|
||||
int64_t length) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
|
||||
"Can't get bitset element at " + std::to_string(field_id.get()));
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
if (auto it = fields_.find(field_id); it != fields_.end()) {
|
||||
auto& field_data = it->second;
|
||||
return field_data->GetBatchBuffer(start_offset, length);
|
||||
}
|
||||
PanicInfo(ErrorCode::UnexpectedError,
|
||||
"get_chunk_buffer only used for variable column field");
|
||||
}
|
||||
|
||||
bool
|
||||
SegmentSealedImpl::is_mmap_field(FieldId field_id) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
return mmap_fields_.find(field_id) != mmap_fields_.end();
|
||||
}
|
||||
|
||||
SpanBase
|
||||
SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
@ -683,6 +697,20 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
||||
return field_data->get_span_base(0);
|
||||
}
|
||||
|
||||
std::vector<std::string_view>
|
||||
SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
|
||||
"Can't get bitset element at " + std::to_string(field_id.get()));
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
if (auto it = fields_.find(field_id); it != fields_.end()) {
|
||||
auto& field_data = it->second;
|
||||
return field_data->StringViews();
|
||||
}
|
||||
PanicInfo(ErrorCode::UnexpectedError,
|
||||
"chunk_view_impl only used for variable column field ");
|
||||
}
|
||||
|
||||
const index::IndexBase*
|
||||
SegmentSealedImpl::chunk_index_impl(FieldId field_id, int64_t chunk_id) const {
|
||||
AssertInfo(scalar_indexings_.find(field_id) != scalar_indexings_.end(),
|
||||
|
@ -149,6 +149,9 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count) const override;
|
||||
|
||||
bool
|
||||
is_mmap_field(FieldId id) const override;
|
||||
|
||||
void
|
||||
ClearData();
|
||||
|
||||
@ -157,6 +160,15 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
SpanBase
|
||||
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||
|
||||
std::vector<std::string_view>
|
||||
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||
|
||||
BufferView
|
||||
get_chunk_buffer(FieldId field_id,
|
||||
int64_t chunk_id,
|
||||
int64_t start_offset,
|
||||
int64_t length) const override;
|
||||
|
||||
const index::IndexBase*
|
||||
chunk_index_impl(FieldId field_id, int64_t chunk_id) const override;
|
||||
|
||||
@ -307,6 +319,7 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
SchemaPtr schema_;
|
||||
int64_t id_;
|
||||
std::unordered_map<FieldId, std::shared_ptr<ColumnBase>> fields_;
|
||||
std::unordered_set<FieldId> mmap_fields_;
|
||||
|
||||
// only useful in binlog
|
||||
IndexMetaPtr col_index_meta_;
|
||||
|
@ -4225,7 +4225,7 @@ TEST(CApiTest, SealedSegment_Update_Field_Size) {
|
||||
int64_t total_size = 0;
|
||||
for (int i = 0; i < N; ++i) {
|
||||
auto str = "string_data_" + std::to_string(i);
|
||||
total_size += str.size();
|
||||
total_size += str.size() + sizeof(uint32_t);
|
||||
str_datas.emplace_back(str);
|
||||
}
|
||||
auto res = LoadFieldRawData(segment, str_fid.get(), str_datas.data(), N);
|
||||
|
@ -498,7 +498,8 @@ TEST(Sealed, LoadFieldData) {
|
||||
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
|
||||
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
|
||||
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
|
||||
auto chunk_span3 = segment->chunk_data<std::string_view>(str_id, 0);
|
||||
auto chunk_span3 =
|
||||
segment->get_batch_views<std::string_view>(str_id, 0, 0, N);
|
||||
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
||||
auto ref2 = dataset.get_col<double>(double_id);
|
||||
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
||||
@ -624,7 +625,8 @@ TEST(Sealed, ClearData) {
|
||||
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
|
||||
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
|
||||
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
|
||||
auto chunk_span3 = segment->chunk_data<std::string_view>(str_id, 0);
|
||||
auto chunk_span3 =
|
||||
segment->get_batch_views<std::string_view>(str_id, 0, 0, N);
|
||||
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
||||
auto ref2 = dataset.get_col<double>(double_id);
|
||||
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
||||
@ -726,7 +728,8 @@ TEST(Sealed, LoadFieldDataMmap) {
|
||||
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
|
||||
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
|
||||
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
|
||||
auto chunk_span3 = segment->chunk_data<std::string_view>(str_id, 0);
|
||||
auto chunk_span3 =
|
||||
segment->get_batch_views<std::string_view>(str_id, 0, 0, N);
|
||||
auto ref1 = dataset.get_col<int64_t>(counter_id);
|
||||
auto ref2 = dataset.get_col<double>(double_id);
|
||||
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
|
||||
|
Loading…
Reference in New Issue
Block a user