mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
fix: [2.4] improve mmap related code in Column.h (#36521)
pr: #36183 issue: #36182 --------- Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
parent
f10806650a
commit
03fba95284
@ -128,10 +128,10 @@ GetDataTypeSize(DataType data_type, int dim = 1) {
|
||||
case DataType::VECTOR_BFLOAT16: {
|
||||
return sizeof(bfloat16) * dim;
|
||||
}
|
||||
// Not supporting VECTOR_SPARSE_FLOAT here intentionally. We can't
|
||||
// easily estimately the size of a sparse float vector. Caller of this
|
||||
// method must handle this case themselves and must not pass
|
||||
// VECTOR_SPARSE_FLOAT data_type.
|
||||
// Not supporting variable length types(such as VECTOR_SPARSE_FLOAT and
|
||||
// VARCHAR) here intentionally. We can't easily estimate the size of
|
||||
// them. Caller of this method must handle this case themselves and must
|
||||
// not pass variable length types to this method.
|
||||
default: {
|
||||
PanicInfo(
|
||||
DataTypeInvalid,
|
||||
|
@ -44,42 +44,99 @@
|
||||
|
||||
namespace milvus {
|
||||
|
||||
/*
|
||||
* If string field's value all empty, need a string padding to avoid
|
||||
* mmap failing because size_ is zero which causing invalid arguement
|
||||
* array has the same problem
|
||||
* TODO: remove it when support NULL value
|
||||
*/
|
||||
constexpr size_t STRING_PADDING = 1;
|
||||
constexpr size_t ARRAY_PADDING = 1;
|
||||
|
||||
constexpr size_t DEFAULT_PK_VRCOL_BLOCK_SIZE = 1;
|
||||
constexpr size_t DEFAULT_MEM_VRCOL_BLOCK_SIZE = 32;
|
||||
constexpr size_t DEFAULT_MMAP_VRCOL_BLOCK_SIZE = 256;
|
||||
|
||||
/**
|
||||
* ColumnBase and its subclasses are designed to store and retrieve the raw data
|
||||
* of a field.
|
||||
*
|
||||
* It has 3 types of constructors corresponding to 3 MappingTypes:
|
||||
*
|
||||
* 1. MAP_WITH_ANONYMOUS: ColumnBase(size_t reserve_size, const FieldMeta& field_meta)
|
||||
*
|
||||
* This is used when we store the entire data in memory. Upon return, a piece
|
||||
* of unwritten memory is allocated and the caller can fill the memory with data by
|
||||
* calling AppendBatch/Append.
|
||||
*
|
||||
* 2. MAP_WITH_FILE: ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
*
|
||||
* This is used when the raw data has already been written into a file, and we
|
||||
* simply mmap the file to memory and interpret the memory as a column. In this
|
||||
* mode, since the data is already in the file/mmapped memory, calling AppendBatch
|
||||
* and Append is not allowed.
|
||||
*
|
||||
* 3. MAP_WITH_MANAGER: ColumnBase(size_t reserve,
|
||||
* const DataType& data_type,
|
||||
* storage::MmapChunkManagerPtr mcm,
|
||||
* storage::MmapChunkDescriptorPtr descriptor)
|
||||
*
|
||||
* This is used when we want to mmap but don't want to download all the data at once.
|
||||
* Instead, we download the data in chunks, cache and mmap each chunk as a single
|
||||
* ColumnBase. Upon return, a piece of unwritten mmaped memory is allocated by the chunk
|
||||
* manager, and the caller should fill the memory with data by calling AppendBatch
|
||||
* and Append.
|
||||
*
|
||||
* - Types with fixed length can use the Column subclass directly.
|
||||
* - Types with variable lengths:
|
||||
* - SparseFloatColumn:
|
||||
* - To store sparse float vectors.
|
||||
* - All 3 modes are supported.
|
||||
* - VariableColumn:
|
||||
* - To store string like types such as VARCHAR and JSON.
|
||||
* - MAP_WITH_MANAGER is not yet supported(as of 2024.09.11).
|
||||
* - ArrayColumn:
|
||||
* - To store ARRAY types.
|
||||
* - MAP_WITH_MANAGER is not yet supported(as of 2024.09.11).
|
||||
*
|
||||
*/
|
||||
class ColumnBase {
|
||||
/**
|
||||
* - data_ points at a piece of memory of size data_cap_size_ + padding_.
|
||||
* Based on mapping_type_, such memory can be:
|
||||
* - an anonymous memory region, allocated by mmap(MAP_ANON)
|
||||
* - a file-backed memory region, mapped by mmap(MAP_FILE)
|
||||
* - a memory region managed by MmapChunkManager, allocated by
|
||||
* MmapChunkManager::Allocate()
|
||||
*
|
||||
* Memory Layout of `data_`:
|
||||
*
|
||||
* |<-- data_cap_size_ -->|<-- padding_ -->|
|
||||
* |<-- data_size_ -->|<-- free space -->|
|
||||
*
|
||||
* AppendBatch/Append should first check if there's enough space for new data.
|
||||
* If not, call Expand() to expand the space.
|
||||
*
|
||||
* - only the first data_cap_size_ bytes can be used to store actual data.
|
||||
* - padding at the end is to ensure when all values are empty, we don't try
|
||||
* to allocate/mmap 0 bytes memory, which will cause mmap() to fail.
|
||||
* - data_size_ is the number of bytes currently used to store actual data.
|
||||
* - num_rows_ is the number of rows currently stored.
|
||||
*
|
||||
*/
|
||||
public:
|
||||
enum MappingType {
|
||||
enum class MappingType {
|
||||
MAP_WITH_ANONYMOUS = 0,
|
||||
MAP_WITH_FILE = 1,
|
||||
MAP_WITH_MANAGER = 2,
|
||||
};
|
||||
// memory mode ctor
|
||||
ColumnBase(size_t reserve, const FieldMeta& field_meta)
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
ColumnBase(size_t reserve_rows, const FieldMeta& field_meta)
|
||||
: mapping_type_(MappingType::MAP_WITH_ANONYMOUS) {
|
||||
auto data_type = field_meta.get_data_type();
|
||||
SetPaddingSize(data_type);
|
||||
|
||||
// We don't pre-allocate memory for variable length data type, data_
|
||||
// will be allocated by Expand() when AppendBatch/Append is called.
|
||||
if (IsVariableDataType(data_type)) {
|
||||
return;
|
||||
}
|
||||
|
||||
type_size_ = field_meta.get_sizeof();
|
||||
|
||||
cap_size_ = type_size_ * reserve;
|
||||
data_cap_size_ = field_meta.get_sizeof() * reserve_rows;
|
||||
|
||||
// use anon mapping so we are able to free these memory with munmap only
|
||||
size_t mapped_size = cap_size_ + padding_;
|
||||
size_t mapped_size = data_cap_size_ + padding_;
|
||||
data_ = static_cast<char*>(mmap(nullptr,
|
||||
mapped_size,
|
||||
PROT_READ | PROT_WRITE,
|
||||
@ -94,51 +151,57 @@ class ColumnBase {
|
||||
UpdateMetricWhenMmap(mapped_size);
|
||||
}
|
||||
|
||||
// use mmap manager ctor, used in growing segment fixed data type
|
||||
// MAP_WITH_MANAGER ctor
|
||||
// reserve is number of bytes to allocate(without padding)
|
||||
ColumnBase(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: mcm_(mcm),
|
||||
mmap_descriptor_(descriptor),
|
||||
num_rows_(0),
|
||||
size_(0),
|
||||
cap_size_(reserve),
|
||||
mapping_type_(MAP_WITH_MANAGER) {
|
||||
data_size_(0),
|
||||
data_cap_size_(reserve),
|
||||
mapping_type_(MappingType::MAP_WITH_MANAGER) {
|
||||
AssertInfo((mcm != nullptr) && descriptor != nullptr,
|
||||
"use wrong mmap chunk manager and mmap chunk descriptor to "
|
||||
"create column.");
|
||||
|
||||
if (!IsVariableDataType(data_type)) {
|
||||
type_size_ = GetDataTypeSize(data_type, dim);
|
||||
if (IsVariableDataType(data_type)) {
|
||||
AssertInfo(data_type == DataType::VECTOR_SPARSE_FLOAT,
|
||||
"MAP_WITH_MANAGER mode only support VECTOR_SPARSE_FLOAT "
|
||||
"if data type is variable length.");
|
||||
}
|
||||
SetPaddingSize(data_type);
|
||||
size_t mapped_size = cap_size_ + padding_;
|
||||
size_t mapped_size = data_cap_size_ + padding_;
|
||||
data_ = (char*)mcm_->Allocate(mmap_descriptor_, (uint64_t)mapped_size);
|
||||
AssertInfo(data_ != nullptr,
|
||||
"fail to create with mmap manager: map_size = {}",
|
||||
mapped_size);
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// User must call Seal to build the view for variable length column.
|
||||
// !!! The incoming file must be write padings at the end of the file.
|
||||
// MAP_WITH_FILE ctor
|
||||
// size is number of bytes of the file, with padding
|
||||
// !!! The incoming file must have padding written at the end of the file.
|
||||
// Subclasses of variable length data type, if they used this constructor,
|
||||
// must set num_rows_ by themselves.
|
||||
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
: mapping_type_(MappingType::MAP_WITH_FILE) {
|
||||
auto data_type = field_meta.get_data_type();
|
||||
SetPaddingSize(data_type);
|
||||
if (!IsVariableDataType(data_type)) {
|
||||
type_size_ = field_meta.get_sizeof();
|
||||
num_rows_ = size / type_size_;
|
||||
auto type_size = field_meta.get_sizeof();
|
||||
num_rows_ = size / type_size;
|
||||
}
|
||||
AssertInfo(size >= padding_,
|
||||
"file size {} is less than padding size {}",
|
||||
size,
|
||||
padding_);
|
||||
|
||||
size_ = size;
|
||||
cap_size_ = size - padding_;
|
||||
// in MAP_WITH_FILE, no extra space written in file, so data_size_ is
|
||||
// the same as data_cap_size_.
|
||||
data_size_ = size - padding_;
|
||||
data_cap_size_ = data_size_;
|
||||
// use exactly same size of file, padding shall be written in file already
|
||||
// see also https://github.com/milvus-io/milvus/issues/34442
|
||||
data_ = static_cast<char*>(
|
||||
@ -151,65 +214,21 @@ class ColumnBase {
|
||||
UpdateMetricWhenMmap(size);
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// User must call Seal to build the view for variable length column.
|
||||
// !!! The incoming file must be write padings at the end of the file.
|
||||
ColumnBase(const File& file,
|
||||
size_t size,
|
||||
int dim,
|
||||
const DataType& data_type)
|
||||
: size_(size), mapping_type_(MappingType::MAP_WITH_FILE) {
|
||||
SetPaddingSize(data_type);
|
||||
|
||||
// use exact same size of file, padding shall be written in file already
|
||||
// see also https://github.com/milvus-io/milvus/issues/34442
|
||||
if (!IsVariableDataType(data_type)) {
|
||||
type_size_ = GetDataTypeSize(data_type, dim);
|
||||
num_rows_ = size / type_size_;
|
||||
}
|
||||
AssertInfo(size >= padding_,
|
||||
"file size {} is less than padding size {}",
|
||||
size,
|
||||
padding_);
|
||||
|
||||
cap_size_ = size - padding_;
|
||||
|
||||
data_ = static_cast<char*>(
|
||||
mmap(nullptr, size, PROT_READ, MAP_SHARED, file.Descriptor(), 0));
|
||||
AssertInfo(data_ != MAP_FAILED,
|
||||
"failed to create file-backed map, err: {}",
|
||||
strerror(errno));
|
||||
|
||||
UpdateMetricWhenMmap(size);
|
||||
}
|
||||
|
||||
virtual ~ColumnBase() {
|
||||
if (data_ != nullptr) {
|
||||
size_t mapped_size = data_cap_size_ + padding_;
|
||||
if (mapping_type_ != MappingType::MAP_WITH_MANAGER) {
|
||||
size_t mapped_size = cap_size_ + padding_;
|
||||
if (munmap(data_, mapped_size)) {
|
||||
AssertInfo(true,
|
||||
"failed to unmap variable field, err={}",
|
||||
strerror(errno));
|
||||
}
|
||||
}
|
||||
UpdateMetricWhenMunmap(cap_size_ + padding_);
|
||||
UpdateMetricWhenMunmap(data_cap_size_ + padding_);
|
||||
}
|
||||
}
|
||||
|
||||
ColumnBase(ColumnBase&& column) noexcept
|
||||
: data_(column.data_),
|
||||
cap_size_(column.cap_size_),
|
||||
padding_(column.padding_),
|
||||
type_size_(column.type_size_),
|
||||
num_rows_(column.num_rows_),
|
||||
size_(column.size_) {
|
||||
column.data_ = nullptr;
|
||||
column.cap_size_ = 0;
|
||||
column.padding_ = 0;
|
||||
column.num_rows_ = 0;
|
||||
column.size_ = 0;
|
||||
}
|
||||
ColumnBase(ColumnBase&&) = delete;
|
||||
|
||||
// Data() points at an addr that contains the elements
|
||||
virtual const char*
|
||||
@ -228,16 +247,16 @@ class ColumnBase {
|
||||
return num_rows_;
|
||||
};
|
||||
|
||||
virtual size_t
|
||||
ByteSize() const {
|
||||
return cap_size_ + padding_;
|
||||
// returns the number of bytes used to store actual data
|
||||
size_t
|
||||
DataByteSize() const {
|
||||
return data_size_;
|
||||
}
|
||||
|
||||
// The capacity of the column,
|
||||
// DO NOT call this for variable length column(including SparseFloatColumn).
|
||||
virtual size_t
|
||||
Capacity() const {
|
||||
return cap_size_ / type_size_;
|
||||
// returns the ballpark number of bytes used by this object
|
||||
size_t
|
||||
MemoryUsageBytes() const {
|
||||
return data_cap_size_ + padding_;
|
||||
}
|
||||
|
||||
virtual SpanBase
|
||||
@ -258,38 +277,33 @@ class ColumnBase {
|
||||
|
||||
virtual void
|
||||
AppendBatch(const FieldDataPtr data) {
|
||||
size_t required_size = size_ + data->Size();
|
||||
if (required_size > cap_size_) {
|
||||
Expand(required_size * 2 + padding_);
|
||||
size_t required_size = data_size_ + data->Size();
|
||||
if (required_size > data_cap_size_) {
|
||||
Expand(required_size * 2);
|
||||
}
|
||||
|
||||
std::copy_n(static_cast<const char*>(data->Data()),
|
||||
data->Size(),
|
||||
data_ + size_);
|
||||
size_ = required_size;
|
||||
data_ + data_size_);
|
||||
data_size_ = required_size;
|
||||
num_rows_ += data->Length();
|
||||
}
|
||||
|
||||
// Append one row
|
||||
virtual void
|
||||
Append(const char* data, size_t size) {
|
||||
size_t required_size = size_ + size;
|
||||
if (required_size > cap_size_) {
|
||||
size_t required_size = data_size_ + size;
|
||||
if (required_size > data_cap_size_) {
|
||||
Expand(required_size * 2);
|
||||
}
|
||||
|
||||
std::copy_n(data, size, data_ + size_);
|
||||
size_ = required_size;
|
||||
std::copy_n(data, size, data_ + data_size_);
|
||||
data_size_ = required_size;
|
||||
num_rows_++;
|
||||
}
|
||||
|
||||
void
|
||||
SetPaddingSize(const DataType& type) {
|
||||
padding_ = PaddingSize(type);
|
||||
}
|
||||
|
||||
protected:
|
||||
// only for memory mode and mmap manager mode, not mmap
|
||||
// new_size should not include padding, padding will be added in Expand()
|
||||
void
|
||||
Expand(size_t new_size) {
|
||||
if (new_size == 0) {
|
||||
@ -299,15 +313,15 @@ class ColumnBase {
|
||||
mapping_type_ == MappingType::MAP_WITH_ANONYMOUS ||
|
||||
mapping_type_ == MappingType::MAP_WITH_MANAGER,
|
||||
"expand function only use in anonymous or with mmap manager");
|
||||
size_t new_mapped_size = new_size + padding_;
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
size_t new_mapped_size = new_size + padding_;
|
||||
auto data = static_cast<char*>(mmap(nullptr,
|
||||
new_mapped_size,
|
||||
PROT_READ | PROT_WRITE,
|
||||
MAP_PRIVATE | MAP_ANON,
|
||||
-1,
|
||||
0));
|
||||
UpdateMetricWhenMmap(true, new_mapped_size);
|
||||
UpdateMetricWhenMmap(new_mapped_size);
|
||||
|
||||
AssertInfo(data != MAP_FAILED,
|
||||
"failed to expand map: {}, new_map_size={}",
|
||||
@ -315,70 +329,73 @@ class ColumnBase {
|
||||
new_size + padding_);
|
||||
|
||||
if (data_ != nullptr) {
|
||||
std::memcpy(data, data_, size_);
|
||||
if (munmap(data_, cap_size_ + padding_)) {
|
||||
std::memcpy(data, data_, data_size_);
|
||||
if (munmap(data_, data_cap_size_ + padding_)) {
|
||||
auto err = errno;
|
||||
size_t mapped_size = new_size + padding_;
|
||||
munmap(data, mapped_size);
|
||||
UpdateMetricWhenMunmap(mapped_size);
|
||||
|
||||
// TODO: error handling is problematic:
|
||||
// if munmap fails, exception will be thrown and caught by
|
||||
// the cgo call, but the program continue to run. and the
|
||||
// successfully newly mmaped data will not be assigned to data_
|
||||
// and got leaked.
|
||||
AssertInfo(
|
||||
false,
|
||||
"failed to unmap while expanding: {}, old_map_size={}",
|
||||
strerror(err),
|
||||
cap_size_ + padding_);
|
||||
data_cap_size_ + padding_);
|
||||
}
|
||||
UpdateMetricWhenMunmap(cap_size_ + padding_);
|
||||
UpdateMetricWhenMunmap(data_cap_size_ + padding_);
|
||||
}
|
||||
|
||||
data_ = data;
|
||||
cap_size_ = new_size;
|
||||
mapping_type_ = MappingType::MAP_WITH_ANONYMOUS;
|
||||
data_cap_size_ = new_size;
|
||||
} else if (mapping_type_ == MappingType::MAP_WITH_MANAGER) {
|
||||
size_t new_mapped_size = new_size + padding_;
|
||||
auto data = mcm_->Allocate(mmap_descriptor_, new_mapped_size);
|
||||
AssertInfo(data != nullptr,
|
||||
"fail to create with mmap manager: map_size = {}",
|
||||
new_mapped_size);
|
||||
std::memcpy(data, data_, size_);
|
||||
std::memcpy(data, data_, data_size_);
|
||||
// allocate space only append in one growing segment, so no need to munmap()
|
||||
data_ = (char*)data;
|
||||
cap_size_ = new_size;
|
||||
mapping_type_ = MappingType::MAP_WITH_MANAGER;
|
||||
data_cap_size_ = new_size;
|
||||
}
|
||||
}
|
||||
|
||||
char* data_{nullptr};
|
||||
// capacity in bytes
|
||||
size_t cap_size_{0};
|
||||
size_t data_cap_size_{0};
|
||||
size_t padding_{0};
|
||||
// type_size_ is not used for sparse float vector column.
|
||||
size_t type_size_{1};
|
||||
size_t num_rows_{0};
|
||||
|
||||
// length in bytes
|
||||
size_t size_{0};
|
||||
size_t data_size_{0};
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
const MappingType mapping_type_;
|
||||
|
||||
private:
|
||||
void
|
||||
UpdateMetricWhenMmap(size_t mmaped_size) {
|
||||
UpdateMetricWhenMmap(mapping_type_, mmaped_size);
|
||||
SetPaddingSize(const DataType& type) {
|
||||
padding_ = PaddingSize(type);
|
||||
}
|
||||
|
||||
void
|
||||
UpdateMetricWhenMmap(bool is_map_anonymous, size_t mapped_size) {
|
||||
UpdateMetricWhenMmap(size_t mapped_size) {
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_anon.Observe(
|
||||
mapped_size);
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_anon.Increment(
|
||||
mapped_size);
|
||||
} else {
|
||||
} else if (mapping_type_ == MappingType::MAP_WITH_FILE) {
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_file.Observe(
|
||||
mapped_size);
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Increment(
|
||||
mapped_size);
|
||||
}
|
||||
// else: does not update metric for MAP_WITH_MANAGER, MmapChunkManagerPtr
|
||||
// will update metric itself.
|
||||
}
|
||||
|
||||
void
|
||||
@ -386,170 +403,153 @@ class ColumnBase {
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_anon.Decrement(
|
||||
mapped_size);
|
||||
} else {
|
||||
} else if (mapping_type_ == MappingType::MAP_WITH_FILE) {
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Decrement(
|
||||
mapped_size);
|
||||
}
|
||||
// else: does not update metric for MAP_WITH_MANAGER, MmapChunkManagerPtr
|
||||
// will update metric itself.
|
||||
}
|
||||
|
||||
private:
|
||||
// mapping_type_
|
||||
MappingType mapping_type_;
|
||||
storage::MmapChunkManagerPtr mcm_ = nullptr;
|
||||
};
|
||||
|
||||
class Column : public ColumnBase {
|
||||
public:
|
||||
// memory mode ctor
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
Column(size_t cap, const FieldMeta& field_meta)
|
||||
: ColumnBase(cap, field_meta) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// MAP_WITH_FILE ctor
|
||||
Column(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
: ColumnBase(file, size, field_meta) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
Column(const File& file, size_t size, int dim, DataType data_type)
|
||||
: ColumnBase(file, size, dim, data_type) {
|
||||
}
|
||||
|
||||
// MAP_WITH_MANAGER ctor
|
||||
Column(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
Column(Column&& column) noexcept : ColumnBase(std::move(column)) {
|
||||
: ColumnBase(reserve, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
~Column() override = default;
|
||||
|
||||
SpanBase
|
||||
Span() const override {
|
||||
return SpanBase(data_, num_rows_, cap_size_ / num_rows_);
|
||||
return SpanBase(data_, num_rows_, data_cap_size_ / num_rows_);
|
||||
}
|
||||
};
|
||||
|
||||
// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used.
|
||||
class SparseFloatColumn : public ColumnBase {
|
||||
public:
|
||||
// memory mode ctor
|
||||
SparseFloatColumn(const FieldMeta& field_meta) : ColumnBase(0, field_meta) {
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
SparseFloatColumn(const FieldMeta& field_meta)
|
||||
: ColumnBase(/*reserve_rows= */ 0, field_meta) {
|
||||
}
|
||||
// mmap mode ctor
|
||||
|
||||
// MAP_WITH_FILE ctor
|
||||
SparseFloatColumn(const File& file,
|
||||
size_t size,
|
||||
const FieldMeta& field_meta)
|
||||
const FieldMeta& field_meta,
|
||||
std::vector<uint64_t>&& indices = {})
|
||||
: ColumnBase(file, size, field_meta) {
|
||||
}
|
||||
// mmap mode ctor
|
||||
SparseFloatColumn(const File& file,
|
||||
size_t size,
|
||||
int dim,
|
||||
const DataType& data_type)
|
||||
: ColumnBase(file, size, dim, data_type) {
|
||||
}
|
||||
// mmap with mmap manager
|
||||
SparseFloatColumn(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
SparseFloatColumn(SparseFloatColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)),
|
||||
dim_(column.dim_),
|
||||
vec_(std::move(column.vec_)) {
|
||||
}
|
||||
|
||||
~SparseFloatColumn() override = default;
|
||||
|
||||
const char*
|
||||
Data() const override {
|
||||
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
|
||||
}
|
||||
|
||||
size_t
|
||||
Capacity() const override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"Capacity not supported for sparse float column");
|
||||
}
|
||||
|
||||
SpanBase
|
||||
Span() const override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"Span not supported for sparse float column");
|
||||
}
|
||||
|
||||
void
|
||||
AppendBatch(const FieldDataPtr data) override {
|
||||
auto ptr = static_cast<const knowhere::sparse::SparseRow<float>*>(
|
||||
data->Data());
|
||||
vec_.insert(vec_.end(), ptr, ptr + data->Length());
|
||||
for (size_t i = 0; i < data->Length(); ++i) {
|
||||
dim_ = std::max(dim_, ptr[i].dim());
|
||||
}
|
||||
num_rows_ += data->Length();
|
||||
}
|
||||
|
||||
void
|
||||
AppendBatchMmap(const FieldDataPtr data) {
|
||||
AssertInfo(data->Size() + size_ <= cap_size_,
|
||||
"append batch mmap exceed");
|
||||
for (size_t i = 0; i < data->get_num_rows(); ++i) {
|
||||
auto vec = static_cast<const knowhere::sparse::SparseRow<float>*>(
|
||||
data->RawValue(i));
|
||||
memcpy(data_ + size_, vec->data(), vec->data_byte_size());
|
||||
size_ += vec->data_byte_size();
|
||||
dim_ = std::max(dim_, vec->dim());
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Append(const char* data, size_t size) override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"Append not supported for sparse float column");
|
||||
}
|
||||
|
||||
int64_t
|
||||
Dim() const {
|
||||
return dim_;
|
||||
}
|
||||
|
||||
void
|
||||
Seal(std::vector<uint64_t> indices) {
|
||||
AssertInfo(!indices.empty(),
|
||||
"indices should not be empty, Seal() of "
|
||||
"SparseFloatColumn must be called only "
|
||||
"at mmap mode");
|
||||
AssertInfo(data_,
|
||||
"data_ should not be nullptr, Seal() of "
|
||||
"SparseFloatColumn must be called only "
|
||||
"at mmap mode");
|
||||
"SparseFloatColumn indices should not be empty.");
|
||||
num_rows_ = indices.size();
|
||||
// so that indices[num_rows_] - indices[num_rows_ - 1] is the size of
|
||||
// so that indices[num_rows_] - indices[num_rows_ - 1] is the byte size of
|
||||
// the last row.
|
||||
indices.push_back(size_);
|
||||
indices.push_back(data_size_);
|
||||
dim_ = 0;
|
||||
for (size_t i = 0; i < num_rows_; i++) {
|
||||
auto vec_size = indices[i + 1] - indices[i];
|
||||
AssertInfo(
|
||||
vec_size % knowhere::sparse::SparseRow<float>::element_size() ==
|
||||
0,
|
||||
"Incorrect sparse vector size: {}",
|
||||
"Incorrect sparse vector byte size: {}",
|
||||
vec_size);
|
||||
vec_.emplace_back(
|
||||
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
|
||||
(uint8_t*)(data_) + indices[i],
|
||||
false);
|
||||
dim_ = std::max(dim_, vec_.back().dim());
|
||||
}
|
||||
}
|
||||
|
||||
// MAP_WITH_MANAGER ctor
|
||||
SparseFloatColumn(storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(
|
||||
/*reserve= */ 0, DataType::VECTOR_SPARSE_FLOAT, mcm, descriptor) {
|
||||
}
|
||||
|
||||
~SparseFloatColumn() override = default;
|
||||
|
||||
// returned pointer points at a list of knowhere::sparse::SparseRow<float>
|
||||
const char*
|
||||
Data() const override {
|
||||
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
|
||||
}
|
||||
|
||||
SpanBase
|
||||
Span() const override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"SparseFloatColumn::Span() not supported");
|
||||
}
|
||||
|
||||
void
|
||||
AppendBatch(const FieldDataPtr data) override {
|
||||
AssertInfo(
|
||||
mapping_type_ != MappingType::MAP_WITH_FILE,
|
||||
"SparseFloatColumn::AppendBatch not supported for MAP_WITH_FILE");
|
||||
|
||||
size_t required_size = data_size_ + data->Size();
|
||||
if (required_size > data_cap_size_) {
|
||||
Expand(required_size * 2);
|
||||
// after expanding, the address of each row in vec_ become invalid.
|
||||
// the number of elements of each row is still correct, update the
|
||||
// address of each row to the new data_.
|
||||
size_t bytes = 0;
|
||||
for (size_t i = 0; i < num_rows_; i++) {
|
||||
auto count = vec_[i].size();
|
||||
auto row_bytes = vec_[i].data_byte_size();
|
||||
// destroy the old object and placement new a new one
|
||||
vec_[i].~SparseRow<float>();
|
||||
new (&vec_[i]) knowhere::sparse::SparseRow<float>(
|
||||
count, (uint8_t*)(data_) + bytes, false);
|
||||
bytes += row_bytes;
|
||||
}
|
||||
}
|
||||
dim_ = std::max(
|
||||
dim_,
|
||||
std::static_pointer_cast<FieldDataSparseVectorImpl>(data)->Dim());
|
||||
|
||||
auto ptr = static_cast<const knowhere::sparse::SparseRow<float>*>(
|
||||
data->Data());
|
||||
|
||||
for (size_t i = 0; i < data->Length(); ++i) {
|
||||
auto row_bytes = ptr[i].data_byte_size();
|
||||
std::memcpy(data_ + data_size_, ptr[i].data(), row_bytes);
|
||||
vec_.emplace_back(
|
||||
ptr[i].size(), (uint8_t*)(data_) + data_size_, false);
|
||||
data_size_ += row_bytes;
|
||||
}
|
||||
num_rows_ += data->Length();
|
||||
}
|
||||
|
||||
void
|
||||
Append(const char* data, size_t size) override {
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"SparseFloatColumn::Append not supported, use AppendBatch instead");
|
||||
}
|
||||
|
||||
int64_t
|
||||
Dim() const {
|
||||
return dim_;
|
||||
}
|
||||
|
||||
private:
|
||||
int64_t dim_ = 0;
|
||||
std::vector<knowhere::sparse::SparseRow<float>> vec_;
|
||||
@ -561,32 +561,20 @@ class VariableColumn : public ColumnBase {
|
||||
using ViewType =
|
||||
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
|
||||
|
||||
// memory mode ctor
|
||||
VariableColumn(size_t cap, const FieldMeta& field_meta, size_t block_size)
|
||||
: ColumnBase(cap, field_meta), block_size_(block_size) {
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
VariableColumn(size_t reserve_rows,
|
||||
const FieldMeta& field_meta,
|
||||
size_t block_size)
|
||||
: ColumnBase(reserve_rows, field_meta), block_size_(block_size) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// MAP_WITH_FILE ctor
|
||||
VariableColumn(const File& file,
|
||||
size_t size,
|
||||
const FieldMeta& field_meta,
|
||||
size_t block_size)
|
||||
: ColumnBase(file, size, field_meta), block_size_(block_size) {
|
||||
}
|
||||
// mmap with mmap manager
|
||||
VariableColumn(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor,
|
||||
size_t block_size)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor),
|
||||
block_size_(block_size) {
|
||||
}
|
||||
|
||||
VariableColumn(VariableColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)), indices_(std::move(column.indices_)) {
|
||||
}
|
||||
|
||||
~VariableColumn() override = default;
|
||||
|
||||
@ -638,7 +626,7 @@ class VariableColumn : public ColumnBase {
|
||||
pos += sizeof(uint32_t) + size;
|
||||
}
|
||||
|
||||
return BufferView{pos, size_ - (pos - data_)};
|
||||
return BufferView{pos, data_size_ - (pos - data_)};
|
||||
}
|
||||
|
||||
ViewType
|
||||
@ -670,9 +658,9 @@ class VariableColumn : public ColumnBase {
|
||||
void
|
||||
Append(FieldDataPtr chunk) {
|
||||
for (auto i = 0; i < chunk->get_num_rows(); i++) {
|
||||
indices_.emplace_back(size_);
|
||||
indices_.emplace_back(data_size_);
|
||||
auto data = static_cast<const T*>(chunk->RawValue(i));
|
||||
size_ += sizeof(uint32_t) + data->size();
|
||||
data_size_ += sizeof(uint32_t) + data->size();
|
||||
}
|
||||
load_buf_.emplace(std::move(chunk));
|
||||
}
|
||||
@ -687,8 +675,8 @@ class VariableColumn : public ColumnBase {
|
||||
|
||||
// for variable length column in memory mode only
|
||||
if (data_ == nullptr) {
|
||||
size_t total_size = size_;
|
||||
size_ = 0;
|
||||
size_t total_size = data_size_;
|
||||
data_size_ = 0;
|
||||
Expand(total_size);
|
||||
|
||||
while (!load_buf_.empty()) {
|
||||
@ -698,11 +686,13 @@ class VariableColumn : public ColumnBase {
|
||||
// data_ as: |size|data|size|data......
|
||||
for (auto i = 0; i < chunk->get_num_rows(); i++) {
|
||||
auto current_size = (uint32_t)chunk->Size(i);
|
||||
std::memcpy(data_ + size_, ¤t_size, sizeof(uint32_t));
|
||||
size_ += sizeof(uint32_t);
|
||||
std::memcpy(
|
||||
data_ + data_size_, ¤t_size, sizeof(uint32_t));
|
||||
data_size_ += sizeof(uint32_t);
|
||||
auto data = static_cast<const T*>(chunk->RawValue(i));
|
||||
std::memcpy(data_ + size_, data->c_str(), data->size());
|
||||
size_ += data->size();
|
||||
std::memcpy(
|
||||
data_ + data_size_, data->c_str(), data->size());
|
||||
data_size_ += data->size();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -734,33 +724,18 @@ class VariableColumn : public ColumnBase {
|
||||
|
||||
class ArrayColumn : public ColumnBase {
|
||||
public:
|
||||
// memory mode ctor
|
||||
ArrayColumn(size_t num_rows, const FieldMeta& field_meta)
|
||||
: ColumnBase(num_rows, field_meta),
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
ArrayColumn(size_t reserve_rows, const FieldMeta& field_meta)
|
||||
: ColumnBase(reserve_rows, field_meta),
|
||||
element_type_(field_meta.get_element_type()) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// MAP_WITH_FILE ctor
|
||||
ArrayColumn(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
: ColumnBase(file, size, field_meta),
|
||||
element_type_(field_meta.get_element_type()) {
|
||||
}
|
||||
|
||||
ArrayColumn(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
ArrayColumn(ArrayColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)),
|
||||
indices_(std::move(column.indices_)),
|
||||
views_(std::move(column.views_)),
|
||||
element_type_(column.element_type_) {
|
||||
}
|
||||
|
||||
~ArrayColumn() override = default;
|
||||
|
||||
SpanBase
|
||||
@ -785,7 +760,7 @@ class ArrayColumn : public ColumnBase {
|
||||
|
||||
void
|
||||
Append(const Array& array) {
|
||||
indices_.emplace_back(size_);
|
||||
indices_.emplace_back(data_size_);
|
||||
element_indices_.emplace_back(array.get_offsets());
|
||||
ColumnBase::Append(static_cast<const char*>(array.data()),
|
||||
array.byte_size());
|
||||
@ -798,6 +773,7 @@ class ArrayColumn : public ColumnBase {
|
||||
indices_ = std::move(indices);
|
||||
element_indices_ = std::move(element_indices);
|
||||
}
|
||||
num_rows_ = indices_.size();
|
||||
ConstructViews();
|
||||
}
|
||||
|
||||
@ -812,7 +788,7 @@ class ArrayColumn : public ColumnBase {
|
||||
std::move(element_indices_[i]));
|
||||
}
|
||||
views_.emplace_back(data_ + indices_.back(),
|
||||
size_ - indices_.back(),
|
||||
data_size_ - indices_.back(),
|
||||
element_type_,
|
||||
std::move(element_indices_[indices_.size() - 1]));
|
||||
element_indices_.clear();
|
||||
|
@ -41,12 +41,13 @@ namespace milvus {
|
||||
|
||||
/*
|
||||
* If string field's value all empty, need a string padding to avoid
|
||||
* mmap failing because size_ is zero which causing invalid arguement
|
||||
* mmap failing because size_ is zero which causing invalid argument
|
||||
* array has the same problem
|
||||
* TODO: remove it when support NULL value
|
||||
*/
|
||||
constexpr size_t FILE_STRING_PADDING = 1;
|
||||
constexpr size_t FILE_ARRAY_PADDING = 1;
|
||||
constexpr size_t SPARSE_FLOAT_PADDING = 4;
|
||||
|
||||
inline size_t
|
||||
PaddingSize(const DataType& type) {
|
||||
@ -60,6 +61,8 @@ PaddingSize(const DataType& type) {
|
||||
break;
|
||||
case DataType::ARRAY:
|
||||
return FILE_ARRAY_PADDING;
|
||||
case DataType::VECTOR_SPARSE_FLOAT:
|
||||
return SPARSE_FLOAT_PADDING;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -401,8 +401,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
||||
var_column->Append(std::move(field_data));
|
||||
}
|
||||
var_column->Seal();
|
||||
field_data_size = var_column->ByteSize();
|
||||
stats_.mem_size += var_column->ByteSize();
|
||||
field_data_size = var_column->DataByteSize();
|
||||
stats_.mem_size += var_column->MemoryUsageBytes();
|
||||
LoadStringSkipIndex(field_id, 0, *var_column);
|
||||
column = std::move(var_column);
|
||||
break;
|
||||
@ -416,8 +416,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
||||
var_column->Append(std::move(field_data));
|
||||
}
|
||||
var_column->Seal();
|
||||
stats_.mem_size += var_column->ByteSize();
|
||||
field_data_size = var_column->ByteSize();
|
||||
stats_.mem_size += var_column->MemoryUsageBytes();
|
||||
field_data_size = var_column->DataByteSize();
|
||||
column = std::move(var_column);
|
||||
break;
|
||||
}
|
||||
@ -583,8 +583,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
|
||||
}
|
||||
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
|
||||
auto sparse_column = std::make_shared<SparseFloatColumn>(
|
||||
file, total_written, field_meta);
|
||||
sparse_column->Seal(std::move(indices));
|
||||
file, total_written, field_meta, std::move(indices));
|
||||
column = std::move(sparse_column);
|
||||
break;
|
||||
}
|
||||
@ -1001,10 +1000,10 @@ SegmentSealedImpl::get_vector(FieldId field_id,
|
||||
"column not found");
|
||||
const auto& column = path_to_column.at(data_path);
|
||||
AssertInfo(
|
||||
offset_in_binlog * row_bytes < column->ByteSize(),
|
||||
offset_in_binlog < column->NumRows(),
|
||||
"column idx out of range, idx: {}, size: {}, data_path: {}",
|
||||
offset_in_binlog * row_bytes,
|
||||
column->ByteSize(),
|
||||
offset_in_binlog,
|
||||
column->NumRows(),
|
||||
data_path);
|
||||
auto vector = &column->Data()[offset_in_binlog * row_bytes];
|
||||
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);
|
||||
|
@ -14,9 +14,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "ChunkCache.h"
|
||||
#include <future>
|
||||
#include <memory>
|
||||
|
||||
#include "ChunkCache.h"
|
||||
#include "common/Types.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
@ -65,7 +66,7 @@ ChunkCache::Read(const std::string& filepath,
|
||||
if (mmap_rss_not_need) {
|
||||
auto ok = madvise(reinterpret_cast<void*>(
|
||||
const_cast<char*>(column->MmappedData())),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
ReadAheadPolicy_Map["dontneed"]);
|
||||
if (ok != 0) {
|
||||
LOG_WARN(
|
||||
@ -74,7 +75,7 @@ ChunkCache::Read(const std::string& filepath,
|
||||
"{}",
|
||||
filepath,
|
||||
static_cast<const void*>(column->MmappedData()),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
strerror(errno));
|
||||
}
|
||||
}
|
||||
@ -121,14 +122,14 @@ ChunkCache::Prefetch(const std::string& filepath) {
|
||||
auto column = it->second.second.get();
|
||||
auto ok = madvise(
|
||||
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
read_ahead_policy_);
|
||||
if (ok != 0) {
|
||||
LOG_WARN(
|
||||
"failed to madvise to the data file {}, addr {}, size {}, err: {}",
|
||||
filepath,
|
||||
static_cast<const void*>(column->MmappedData()),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
strerror(errno));
|
||||
}
|
||||
}
|
||||
@ -136,7 +137,6 @@ ChunkCache::Prefetch(const std::string& filepath) {
|
||||
std::shared_ptr<ColumnBase>
|
||||
ChunkCache::Mmap(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptorPtr& descriptor) {
|
||||
auto dim = field_data->get_dim();
|
||||
auto data_type = field_data->get_data_type();
|
||||
|
||||
auto data_size = field_data->Size();
|
||||
@ -144,25 +144,15 @@ ChunkCache::Mmap(const FieldDataPtr& field_data,
|
||||
std::shared_ptr<ColumnBase> column{};
|
||||
|
||||
if (IsSparseFloatVectorDataType(data_type)) {
|
||||
std::vector<uint64_t> indices{};
|
||||
uint64_t offset = 0;
|
||||
for (auto i = 0; i < field_data->get_num_rows(); ++i) {
|
||||
indices.push_back(offset);
|
||||
offset += field_data->Size(i);
|
||||
}
|
||||
auto sparse_column = std::make_shared<SparseFloatColumn>(
|
||||
data_size, dim, data_type, mcm_, descriptor);
|
||||
sparse_column->AppendBatchMmap(field_data);
|
||||
sparse_column->Seal(std::move(indices));
|
||||
column = std::move(sparse_column);
|
||||
column = std::make_shared<SparseFloatColumn>(mcm_, descriptor);
|
||||
} else if (IsVariableDataType(data_type)) {
|
||||
AssertInfo(
|
||||
false, "TODO: unimplemented for variable data type: {}", data_type);
|
||||
} else {
|
||||
column = std::make_shared<Column>(
|
||||
data_size, dim, data_type, mcm_, descriptor);
|
||||
column->AppendBatch(field_data);
|
||||
column =
|
||||
std::make_shared<Column>(data_size, data_type, mcm_, descriptor);
|
||||
}
|
||||
column->AppendBatch(field_data);
|
||||
return column;
|
||||
}
|
||||
} // namespace milvus::storage
|
||||
|
@ -27,7 +27,7 @@
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
|
||||
#define DEFAULT_READ_AHEAD_POLICY "willneed"
|
||||
class ChunkCacheTest : public testing::Test {
|
||||
class ChunkCacheTest : public testing::TestWithParam</*mmap enabled*/ bool> {
|
||||
public:
|
||||
void
|
||||
SetUp() override {
|
||||
@ -38,7 +38,8 @@ class ChunkCacheTest : public testing::Test {
|
||||
TearDown() override {
|
||||
mcm->UnRegister(descriptor);
|
||||
}
|
||||
const char* file_name = "chunk_cache_test/insert_log/2/101/1000000";
|
||||
const char* dense_file_name = "chunk_cache_test/insert_log/2/101/1000000";
|
||||
const char* sparse_file_name = "chunk_cache_test/insert_log/2/102/1000000";
|
||||
milvus::storage::MmapChunkManagerPtr mcm;
|
||||
milvus::segcore::SegcoreConfig config;
|
||||
milvus::storage::MmapChunkDescriptorPtr descriptor =
|
||||
@ -47,100 +48,210 @@ class ChunkCacheTest : public testing::Test {
|
||||
{101, SegmentType::Sealed}));
|
||||
};
|
||||
|
||||
TEST_F(ChunkCacheTest, Read) {
|
||||
INSTANTIATE_TEST_SUITE_P(ChunkCacheTestSuite,
|
||||
ChunkCacheTest,
|
||||
testing::Values(true, false));
|
||||
|
||||
TEST_P(ChunkCacheTest, Read) {
|
||||
auto N = 10000;
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
auto dense_metric_type = knowhere::metric::L2;
|
||||
auto sparse_metric_type = knowhere::metric::IP;
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto fake_dense_vec_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
|
||||
auto fake_sparse_vec_id =
|
||||
schema->AddDebugField("fakevec_sparse",
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
auto dataset = milvus::segcore::DataGen(schema, N);
|
||||
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
fake_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
metric_type);
|
||||
auto dense_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
|
||||
auto sparse_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
|
||||
auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
|
||||
fake_dense_vec_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
dense_metric_type);
|
||||
auto sparse_field_meta =
|
||||
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
|
||||
fake_sparse_vec_id,
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type);
|
||||
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
auto data = dataset.get_col<float>(fake_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto dense_data = dataset.get_col<float>(fake_dense_vec_id);
|
||||
auto sparse_data =
|
||||
dataset.get_col<knowhere::sparse::SparseRow<float>>(fake_sparse_vec_id);
|
||||
|
||||
auto data_slices = std::vector<void*>{dense_data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
auto slice_names = std::vector<std::string>{dense_file_name};
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
field_data_meta,
|
||||
field_meta);
|
||||
dense_field_data_meta,
|
||||
dense_field_meta);
|
||||
|
||||
data_slices = std::vector<void*>{sparse_data.data()};
|
||||
slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
slice_names = std::vector<std::string>{sparse_file_name};
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
sparse_field_data_meta,
|
||||
sparse_field_meta);
|
||||
|
||||
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
|
||||
const auto& column = cc->Read(file_name, descriptor);
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
|
||||
auto actual = (float*)column->Data();
|
||||
for (auto i = 0; i < N; i++) {
|
||||
AssertInfo(data[i] == actual[i],
|
||||
fmt::format("expect {}, actual {}", data[i], actual[i]));
|
||||
// validate dense data
|
||||
const auto& dense_column = cc->Read(dense_file_name, descriptor);
|
||||
Assert(dense_column->DataByteSize() == dim * N * 4);
|
||||
auto actual_dense = (const float*)(dense_column->Data());
|
||||
for (auto i = 0; i < N * dim; i++) {
|
||||
AssertInfo(dense_data[i] == actual_dense[i],
|
||||
fmt::format(
|
||||
"expect {}, actual {}", dense_data[i], actual_dense[i]));
|
||||
}
|
||||
|
||||
cc->Remove(file_name);
|
||||
lcm->Remove(file_name);
|
||||
// validate sparse data
|
||||
const auto& sparse_column = cc->Read(sparse_file_name, descriptor);
|
||||
auto expected_sparse_size = 0;
|
||||
auto actual_sparse =
|
||||
(const knowhere::sparse::SparseRow<float>*)(sparse_column->Data());
|
||||
for (auto i = 0; i < N; i++) {
|
||||
const auto& actual_sparse_row = actual_sparse[i];
|
||||
const auto& expect_sparse_row = sparse_data[i];
|
||||
AssertInfo(
|
||||
actual_sparse_row.size() == expect_sparse_row.size(),
|
||||
fmt::format("Incorrect size of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.size(),
|
||||
actual_sparse_row.size()));
|
||||
auto bytes = actual_sparse_row.data_byte_size();
|
||||
AssertInfo(
|
||||
memcmp(actual_sparse_row.data(), expect_sparse_row.data(), bytes) ==
|
||||
0,
|
||||
fmt::format("Incorrect data of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.data(),
|
||||
actual_sparse_row.data()));
|
||||
expected_sparse_size += bytes;
|
||||
}
|
||||
|
||||
ASSERT_EQ(sparse_column->DataByteSize(), expected_sparse_size);
|
||||
|
||||
cc->Remove(dense_file_name);
|
||||
cc->Remove(sparse_file_name);
|
||||
lcm->Remove(dense_file_name);
|
||||
lcm->Remove(sparse_file_name);
|
||||
}
|
||||
|
||||
TEST_F(ChunkCacheTest, TestMultithreads) {
|
||||
TEST_P(ChunkCacheTest, TestMultithreads) {
|
||||
auto N = 1000;
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
auto dense_metric_type = knowhere::metric::L2;
|
||||
auto sparse_metric_type = knowhere::metric::IP;
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto fake_dense_vec_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
|
||||
auto fake_sparse_vec_id =
|
||||
schema->AddDebugField("fakevec_sparse",
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
auto dataset = milvus::segcore::DataGen(schema, N);
|
||||
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
fake_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
metric_type);
|
||||
auto dense_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
|
||||
auto sparse_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
|
||||
auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
|
||||
fake_dense_vec_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
dense_metric_type);
|
||||
auto sparse_field_meta =
|
||||
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
|
||||
fake_sparse_vec_id,
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type);
|
||||
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
auto data = dataset.get_col<float>(fake_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto dense_data = dataset.get_col<float>(fake_dense_vec_id);
|
||||
auto sparse_data =
|
||||
dataset.get_col<knowhere::sparse::SparseRow<float>>(fake_sparse_vec_id);
|
||||
|
||||
auto dense_data_slices = std::vector<void*>{dense_data.data()};
|
||||
auto sparse_data_slices = std::vector<void*>{sparse_data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
auto dense_slice_names = std::vector<std::string>{dense_file_name};
|
||||
auto sparse_slice_names = std::vector<std::string>{sparse_file_name};
|
||||
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
dense_data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
field_data_meta,
|
||||
field_meta);
|
||||
dense_slice_names,
|
||||
dense_field_data_meta,
|
||||
dense_field_meta);
|
||||
|
||||
PutFieldData(lcm.get(),
|
||||
sparse_data_slices,
|
||||
slice_sizes,
|
||||
sparse_slice_names,
|
||||
sparse_field_data_meta,
|
||||
sparse_field_meta);
|
||||
|
||||
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
|
||||
|
||||
constexpr int threads = 16;
|
||||
std::vector<int64_t> total_counts(threads);
|
||||
auto executor = [&](int thread_id) {
|
||||
const auto& column = cc->Read(file_name, descriptor);
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
const auto& dense_column = cc->Read(dense_file_name, descriptor);
|
||||
Assert(dense_column->DataByteSize() == dim * N * 4);
|
||||
|
||||
auto actual = (float*)column->Data();
|
||||
auto actual_dense = (const float*)dense_column->Data();
|
||||
for (auto i = 0; i < N * dim; i++) {
|
||||
AssertInfo(
|
||||
dense_data[i] == actual_dense[i],
|
||||
fmt::format(
|
||||
"expect {}, actual {}", dense_data[i], actual_dense[i]));
|
||||
}
|
||||
|
||||
const auto& sparse_column = cc->Read(sparse_file_name, descriptor);
|
||||
auto actual_sparse =
|
||||
(const knowhere::sparse::SparseRow<float>*)sparse_column->Data();
|
||||
for (auto i = 0; i < N; i++) {
|
||||
AssertInfo(data[i] == actual[i],
|
||||
fmt::format("expect {}, actual {}", data[i], actual[i]));
|
||||
const auto& actual_sparse_row = actual_sparse[i];
|
||||
const auto& expect_sparse_row = sparse_data[i];
|
||||
AssertInfo(actual_sparse_row.size() == expect_sparse_row.size(),
|
||||
fmt::format(
|
||||
"Incorrect size of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.size(),
|
||||
actual_sparse_row.size()));
|
||||
auto bytes = actual_sparse_row.data_byte_size();
|
||||
AssertInfo(memcmp(actual_sparse_row.data(),
|
||||
expect_sparse_row.data(),
|
||||
bytes) == 0,
|
||||
fmt::format(
|
||||
"Incorrect data of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.data(),
|
||||
actual_sparse_row.data()));
|
||||
}
|
||||
};
|
||||
std::vector<std::thread> pool;
|
||||
@ -151,6 +262,8 @@ TEST_F(ChunkCacheTest, TestMultithreads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
cc->Remove(file_name);
|
||||
lcm->Remove(file_name);
|
||||
cc->Remove(dense_file_name);
|
||||
cc->Remove(sparse_file_name);
|
||||
lcm->Remove(dense_file_name);
|
||||
lcm->Remove(sparse_file_name);
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "storage/MmapManager.h"
|
||||
#include "storage/MinioChunkManager.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/Util.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
@ -1469,9 +1470,6 @@ TEST(Sealed, GetVectorFromChunkCache) {
|
||||
}
|
||||
|
||||
TEST(Sealed, GetSparseVectorFromChunkCache) {
|
||||
// skip test due to mem leak from AWS::InitSDK
|
||||
return;
|
||||
|
||||
auto dim = 16;
|
||||
auto topK = 5;
|
||||
auto N = ROW_COUNT;
|
||||
@ -1483,9 +1481,8 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
||||
auto file_name = std::string(
|
||||
"sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000");
|
||||
|
||||
auto sc = milvus::storage::StorageConfig{};
|
||||
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc);
|
||||
auto mcm = std::make_unique<milvus::storage::MinioChunkManager>(sc);
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto fakevec_id = schema->AddDebugField(
|
||||
@ -1502,19 +1499,27 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
||||
auto dataset = DataGen(schema, N);
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fakevec_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
|
||||
fakevec_id,
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
metric_type);
|
||||
|
||||
auto rcm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
|
||||
.GetRemoteChunkManager();
|
||||
auto data = dataset.get_col<knowhere::sparse::SparseRow<float>>(fakevec_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
PutFieldData(rcm.get(),
|
||||
|
||||
// write to multiple files for better coverage
|
||||
auto data_slices = std::vector<void*>();
|
||||
auto slice_sizes = std::vector<int64_t>();
|
||||
auto slice_names = std::vector<std::string>();
|
||||
|
||||
const int64_t slice_size = (N + 9) / 10;
|
||||
for (int64_t i = 0; i < N; i += slice_size) {
|
||||
int64_t current_slice_size = std::min(slice_size, N - i);
|
||||
data_slices.push_back(data.data() + i);
|
||||
slice_sizes.push_back(current_slice_size);
|
||||
slice_names.push_back(file_name + "_" + std::to_string(i / slice_size));
|
||||
}
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
@ -1538,11 +1543,7 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
||||
segment_sealed->LoadIndex(vec_info);
|
||||
|
||||
auto field_binlog_info =
|
||||
FieldBinlogInfo{fakevec_id.get(),
|
||||
N,
|
||||
std::vector<int64_t>{N},
|
||||
false,
|
||||
std::vector<std::string>{file_name}};
|
||||
FieldBinlogInfo{fakevec_id.get(), N, slice_sizes, false, slice_names};
|
||||
segment_sealed->AddFieldDataInfoForSealed(
|
||||
LoadFieldDataInfo{std::map<int64_t, FieldBinlogInfo>{
|
||||
{fakevec_id.get(), field_binlog_info}}});
|
||||
@ -1569,9 +1570,11 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
||||
"sparse float vector doesn't match");
|
||||
}
|
||||
|
||||
rcm->Remove(file_name);
|
||||
auto exist = rcm->Exist(file_name);
|
||||
Assert(!exist);
|
||||
for (const auto& name : slice_names) {
|
||||
lcm->Remove(name);
|
||||
auto exist = lcm->Exist(name);
|
||||
Assert(!exist);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Sealed, WarmupChunkCache) {
|
||||
|
Loading…
Reference in New Issue
Block a user