Add attributes to mysql and wal (#2229)

* Add hybrid operation into mysql

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Add Hybrid into wal

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Add unittest for hybrid wal

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Fix wal unittest bug

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Fix hybrid_insert unittest bugs

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Fix codacy/PR quality

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Change test_rpc port to 19531

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
yukun 2020-05-07 14:14:58 +08:00 committed by GitHub
parent 6de141b2af
commit 1516d71a55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1293 additions and 304 deletions

View File

@ -131,7 +131,6 @@ DBImpl::Start() {
if (record.type == wal::MXLogType::None) {
break;
}
ExecWalRecord(record);
}
@ -227,8 +226,9 @@ DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::
}
meta::CollectionSchema temp_schema = collection_schema;
temp_schema.index_file_size_ *= MB;
if (options_.wal_enable_) {
// TODO(yukun): wal_mgr_->CreateHybridCollection()
temp_schema.flush_lsn_ = wal_mgr_->CreateHybridCollection(collection_schema.collection_id_);
}
return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema);
@ -606,6 +606,127 @@ DBImpl::InsertVectors(const std::string& collection_id, const std::string& parti
return status;
}
Status
CopyToAttr(std::vector<uint8_t>& record, uint64_t row_num, const std::vector<std::string>& field_names,
std::unordered_map<std::string, meta::hybrid::DataType>& attr_types,
std::unordered_map<std::string, std::vector<uint8_t>>& attr_datas,
std::unordered_map<std::string, uint64_t>& attr_nbytes,
std::unordered_map<std::string, uint64_t>& attr_data_size) {
uint64_t offset = 0;
for (auto name : field_names) {
switch (attr_types.at(name)) {
case meta::hybrid::DataType::INT8: {
std::vector<uint8_t> data;
data.resize(row_num * sizeof(int8_t));
std::vector<int64_t> attr_value(row_num, 0);
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
std::vector<int8_t> raw_value(row_num, 0);
for (uint64_t i = 0; i < row_num; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), row_num * sizeof(int8_t));
attr_datas.insert(std::make_pair(name, data));
attr_nbytes.insert(std::make_pair(name, sizeof(int8_t)));
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int8_t)));
offset += row_num * sizeof(int64_t);
break;
}
case meta::hybrid::DataType::INT16: {
std::vector<uint8_t> data;
data.resize(row_num * sizeof(int16_t));
std::vector<int64_t> attr_value(row_num, 0);
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
std::vector<int16_t> raw_value(row_num, 0);
for (uint64_t i = 0; i < row_num; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), row_num * sizeof(int16_t));
attr_datas.insert(std::make_pair(name, data));
attr_nbytes.insert(std::make_pair(name, sizeof(int16_t)));
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int16_t)));
offset += row_num * sizeof(int64_t);
break;
}
case meta::hybrid::DataType::INT32: {
std::vector<uint8_t> data;
data.resize(row_num * sizeof(int32_t));
std::vector<int64_t> attr_value(row_num, 0);
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
std::vector<int32_t> raw_value(row_num, 0);
for (uint64_t i = 0; i < row_num; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), row_num * sizeof(int32_t));
attr_datas.insert(std::make_pair(name, data));
attr_nbytes.insert(std::make_pair(name, sizeof(int32_t)));
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int32_t)));
offset += row_num * sizeof(int64_t);
break;
}
case meta::hybrid::DataType::INT64: {
std::vector<uint8_t> data;
data.resize(row_num * sizeof(int64_t));
memcpy(data.data(), record.data() + offset, row_num * sizeof(int64_t));
attr_datas.insert(std::make_pair(name, data));
std::vector<int64_t> test_data(row_num);
memcpy(test_data.data(), record.data(), row_num * sizeof(int64_t));
attr_nbytes.insert(std::make_pair(name, sizeof(int64_t)));
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int64_t)));
offset += row_num * sizeof(int64_t);
break;
}
case meta::hybrid::DataType::FLOAT: {
std::vector<uint8_t> data;
data.resize(row_num * sizeof(float));
std::vector<double> attr_value(row_num, 0);
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(double));
std::vector<float> raw_value(row_num, 0);
for (uint64_t i = 0; i < row_num; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), row_num * sizeof(float));
attr_datas.insert(std::make_pair(name, data));
attr_nbytes.insert(std::make_pair(name, sizeof(float)));
attr_data_size.insert(std::make_pair(name, row_num * sizeof(float)));
offset += row_num * sizeof(double);
break;
}
case meta::hybrid::DataType::DOUBLE: {
std::vector<uint8_t> data;
data.resize(row_num * sizeof(double));
memcpy(data.data(), record.data() + offset, row_num * sizeof(double));
attr_datas.insert(std::make_pair(name, data));
attr_nbytes.insert(std::make_pair(name, sizeof(double)));
attr_data_size.insert(std::make_pair(name, row_num * sizeof(double)));
offset += row_num * sizeof(double);
break;
}
default:
break;
}
}
return Status::OK();
}
Status
DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
const std::vector<std::string>& field_names, Entity& entity,
@ -624,7 +745,15 @@ DBImpl::InsertEntities(const std::string& collection_id, const std::string& part
}
Status status;
// insert entities: collection_name is field id
std::unordered_map<std::string, std::vector<uint8_t>> attr_data;
std::unordered_map<std::string, uint64_t> attr_nbytes;
std::unordered_map<std::string, uint64_t> attr_data_size;
status = CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes,
attr_data_size);
if (!status.ok()) {
return status;
}
wal::MXLogRecord record;
record.lsn = 0;
record.collection_id = collection_id;
@ -637,123 +766,62 @@ DBImpl::InsertEntities(const std::string& collection_id, const std::string& part
record.type = wal::MXLogType::Entity;
record.data = vector_it->second.float_data_.data();
record.data_size = vector_it->second.float_data_.size() * sizeof(float);
record.attr_data = attr_data;
record.attr_nbytes = attr_nbytes;
record.attr_data_size = attr_data_size;
} else {
// record.type = wal::MXLogType::InsertBinary;
// record.data = entities.vector_data_[0].binary_data_.data();
// record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
}
uint64_t offset = 0;
for (auto field_name : field_names) {
switch (attr_types.at(field_name)) {
case meta::hybrid::DataType::INT8: {
std::vector<uint8_t> data;
data.resize(entity.entity_count_ * sizeof(int8_t));
std::vector<int64_t> attr_value(entity.entity_count_, 0);
memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
offset += entity.entity_count_ * sizeof(int64_t);
std::vector<int8_t> raw_value(entity.entity_count_, 0);
for (uint64_t i = 0; i < entity.entity_count_; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int8_t));
record.attr_data.insert(std::make_pair(field_name, data));
record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int8_t)));
record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int8_t)));
break;
}
case meta::hybrid::DataType::INT16: {
std::vector<uint8_t> data;
data.resize(entity.entity_count_ * sizeof(int16_t));
std::vector<int64_t> attr_value(entity.entity_count_, 0);
memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
offset += entity.entity_count_ * sizeof(int64_t);
std::vector<int16_t> raw_value(entity.entity_count_, 0);
for (uint64_t i = 0; i < entity.entity_count_; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int16_t));
record.attr_data.insert(std::make_pair(field_name, data));
record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int16_t)));
record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int16_t)));
break;
}
case meta::hybrid::DataType::INT32: {
std::vector<uint8_t> data;
data.resize(entity.entity_count_ * sizeof(int32_t));
std::vector<int64_t> attr_value(entity.entity_count_, 0);
memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
offset += entity.entity_count_ * sizeof(int64_t);
std::vector<int32_t> raw_value(entity.entity_count_, 0);
for (uint64_t i = 0; i < entity.entity_count_; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int32_t));
record.attr_data.insert(std::make_pair(field_name, data));
record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int32_t)));
record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int32_t)));
break;
}
case meta::hybrid::DataType::INT64: {
std::vector<uint8_t> data;
data.resize(entity.entity_count_ * sizeof(int64_t));
memcpy(data.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
record.attr_data.insert(std::make_pair(field_name, data));
record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int64_t)));
record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int64_t)));
offset += entity.entity_count_ * sizeof(int64_t);
break;
}
case meta::hybrid::DataType::FLOAT: {
std::vector<uint8_t> data;
data.resize(entity.entity_count_ * sizeof(float));
std::vector<double> attr_value(entity.entity_count_, 0);
memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(double));
offset += entity.entity_count_ * sizeof(double);
std::vector<float> raw_value(entity.entity_count_, 0);
for (uint64_t i = 0; i < entity.entity_count_; ++i) {
raw_value[i] = attr_value[i];
}
memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(float));
record.attr_data.insert(std::make_pair(field_name, data));
record.attr_nbytes.insert(std::make_pair(field_name, sizeof(float)));
record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(float)));
break;
}
case meta::hybrid::DataType::DOUBLE: {
std::vector<uint8_t> data;
data.resize(entity.entity_count_ * sizeof(double));
memcpy(data.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(double));
record.attr_data.insert(std::make_pair(field_name, data));
record.attr_nbytes.insert(std::make_pair(field_name, sizeof(double)));
record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(double)));
offset += entity.entity_count_ * sizeof(double);
break;
}
default:
break;
}
}
status = ExecWalRecord(record);
#if 0
if (options_.wal_enable_) {
std::string target_collection_name;
status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
return status;
}
auto vector_it = entity.vector_data_.begin();
if (!vector_it->second.binary_data_.empty()) {
wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.binary_data_,
attr_nbytes, attr_data);
} else if (!vector_it->second.float_data_.empty()) {
wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.float_data_,
attr_nbytes, attr_data);
}
swn_wal_.Notify();
} else {
// insert entities: collection_name is field id
wal::MXLogRecord record;
record.lsn = 0;
record.collection_id = collection_id;
record.partition_tag = partition_tag;
record.ids = entity.id_array_.data();
record.length = entity.entity_count_;
auto vector_it = entity.vector_data_.begin();
if (vector_it->second.binary_data_.empty()) {
record.type = wal::MXLogType::Entity;
record.data = vector_it->second.float_data_.data();
record.data_size = vector_it->second.float_data_.size() * sizeof(float);
record.attr_data = attr_data;
record.attr_nbytes = attr_nbytes;
record.attr_data_size = attr_data_size;
} else {
// record.type = wal::MXLogType::InsertBinary;
// record.data = entities.vector_data_[0].binary_data_.data();
// record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
}
status = ExecWalRecord(record);
}
#endif
return status;
}

View File

@ -133,26 +133,6 @@ struct VectorFieldsSchema {
};
using VectorFieldSchemaPtr = std::shared_ptr<VectorFieldSchema>;
struct CollectionSchema {
typedef enum {
NORMAL,
TO_DELETE,
} COLLETION_STATE;
size_t id_ = 0;
std::string collection_id_;
int32_t state_ = (int)NORMAL;
int64_t field_num = 0;
int64_t created_on_ = 0;
int64_t flag_ = 0;
std::string owner_collection_;
std::string partition_tag_;
std::string version_ = CURRENT_VERSION;
uint64_t flush_lsn_ = 0;
};
using CollectionSchemaPtr = std::shared_ptr<CollectionSchema>;
struct FieldSchema {
typedef enum {
INT8 = 1,

View File

@ -184,6 +184,14 @@ static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
MetaField("flush_lsn", "BIGINT", "DEFAULT 0 NOT NULL"),
});
// Fields schema
static const MetaSchema FIELDS_SCHEMA(META_FIELDS, {
MetaField("collection_id", "VARCHAR(255)", "NOT NULL"),
MetaField("field_name", "VARCHAR(255)", "NOT NULL"),
MetaField("field_type", "INT", "DEFAULT 0 NOT NULL"),
MetaField("field_params", "VARCHAR(255)", "NOT NULL"),
});
} // namespace
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -264,10 +272,15 @@ MySQLMetaImpl::ValidateMetaSchema() {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
// verufy TableFiles
// verify TableFiles
if (!validate_func(TABLEFILES_SCHEMA)) {
throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
}
// verify Fields
if (!validate_func(FIELDS_SCHEMA)) {
throw Exception(DB_INCOMPATIB_META, "Meta Fields schema is created by milvus old version");
}
}
Status
@ -380,6 +393,18 @@ MySQLMetaImpl::Initialize() {
throw Exception(DB_META_TRANSACTION_FAILED, msg);
}
// step 10: create meta table Field
InitializeQuery << "CREATE TABLE IF NOT EXISTS " << FIELDS_SCHEMA.name() << " (" << FIELDS_SCHEMA.ToString() + ");";
LOG_ENGINE_DEBUG_ << "Initialize: " << InitializeQuery.str();
initialize_query_exec = InitializeQuery.exec();
if (!initialize_query_exec) {
std::string msg = "Failed to create meta table 'Fields' in MySQL";
LOG_ENGINE_ERROR_ << msg;
throw Exception(DB_META_TRANSACTION_FAILED, msg);
}
return Status::OK();
}
@ -2641,10 +2666,179 @@ MySQLMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
Status
MySQLMetaImpl::CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) {
try {
server::MetricCollector metric;
{
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
fiu_do_on("MySQLMetaImpl.CreateCollection.null_connection", is_null_connection = true);
fiu_do_on("MySQLMetaImpl.CreateCollection.throw_exception", throw std::exception(););
if (is_null_connection) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query statement = connectionPtr->query();
if (collection_schema.collection_id_.empty()) {
NextCollectionId(collection_schema.collection_id_);
} else {
statement << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_schema.collection_id_ << ";";
LOG_ENGINE_DEBUG_ << "CreateCollection: " << statement.str();
mysqlpp::StoreQueryResult res = statement.store();
if (res.num_rows() == 1) {
int state = res[0]["state"];
fiu_do_on("MySQLMetaImpl.CreateCollection.schema_TO_DELETE", state = CollectionSchema::TO_DELETE);
if (CollectionSchema::TO_DELETE == state) {
return Status(DB_ERROR,
"Collection already exists and it is in delete state, please wait a second");
} else {
return Status(DB_ALREADY_EXIST, "Collection already exists");
}
}
}
collection_schema.id_ = -1;
collection_schema.created_on_ = utils::GetMicroSecTimeStamp();
std::string id = "NULL"; // auto-increment
std::string& collection_id = collection_schema.collection_id_;
std::string state = std::to_string(collection_schema.state_);
std::string dimension = std::to_string(collection_schema.dimension_);
std::string created_on = std::to_string(collection_schema.created_on_);
std::string flag = std::to_string(collection_schema.flag_);
std::string index_file_size = std::to_string(collection_schema.index_file_size_);
std::string engine_type = std::to_string(collection_schema.engine_type_);
std::string& index_params = collection_schema.index_params_;
std::string metric_type = std::to_string(collection_schema.metric_type_);
std::string& owner_collection = collection_schema.owner_collection_;
std::string& partition_tag = collection_schema.partition_tag_;
std::string& version = collection_schema.version_;
std::string flush_lsn = std::to_string(collection_schema.flush_lsn_);
statement << "INSERT INTO " << META_TABLES << " VALUES(" << id << ", " << mysqlpp::quote << collection_id
<< ", " << state << ", " << dimension << ", " << created_on << ", " << flag << ", "
<< index_file_size << ", " << engine_type << ", " << mysqlpp::quote << index_params << ", "
<< metric_type << ", " << mysqlpp::quote << owner_collection << ", " << mysqlpp::quote
<< partition_tag << ", " << mysqlpp::quote << version << ", " << flush_lsn << ");";
LOG_ENGINE_DEBUG_ << "CreateHybridCollection: " << statement.str();
if (mysqlpp::SimpleResult res = statement.execute()) {
collection_schema.id_ = res.insert_id(); // Might need to use SELECT LAST_INSERT_ID()?
// Consume all results to avoid "Commands out of sync" error
} else {
return HandleException("Failed to create collection", statement.error());
}
for (auto schema : fields_schema.fields_schema_) {
std::string id = "NULL";
std::string collection_id = schema.collection_id_;
std::string field_name = schema.field_name_;
std::string field_type = std::to_string(schema.field_type_);
std::string field_params = schema.field_params_;
statement << "INSERT INTO " << META_FIELDS << " VALUES(" << mysqlpp::quote << collection_id << ", "
<< mysqlpp::quote << field_name << ", " << field_type << ", " << mysqlpp::quote << ", "
<< field_params << ");";
LOG_ENGINE_DEBUG_ << "Create field: " << statement.str();
if (mysqlpp::SimpleResult field_res = statement.execute()) {
// TODO(yukun): need field id?
} else {
return HandleException("Failed to create field table", statement.error());
}
}
} // Scoped Connection
LOG_ENGINE_DEBUG_ << "Successfully create hybrid collection: " << collection_schema.collection_id_;
std::cout << collection_schema.collection_id_;
return utils::CreateCollectionPath(options_, collection_schema.collection_id_);
} catch (std::exception& e) {
return HandleException("Failed to create collection", e.what());
}
}
Status
MySQLMetaImpl::DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) {
try {
server::MetricCollector metric;
mysqlpp::StoreQueryResult res, field_res;
{
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
fiu_do_on("MySQLMetaImpl.DescribeCollection.null_connection", is_null_connection = true);
fiu_do_on("MySQLMetaImpl.DescribeCollection.throw_exception", throw std::exception(););
if (is_null_connection) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, index_params"
<< " , metric_type ,owner_table, partition_tag, version, flush_lsn"
<< " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_schema.collection_id_ << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "DescribeHybridCollection: " << statement.str();
res = statement.store();
mysqlpp::Query field_statement = connectionPtr->query();
field_statement << "SELECT collection_id, field_name, field_type, field_params"
<< " FROM " << META_FIELDS << " WHERE collection_id = " << mysqlpp::quote
<< collection_schema.collection_id_ << ";";
LOG_ENGINE_DEBUG_ << "Describe Collection Fields: " << field_statement.str();
field_res = field_statement.store();
} // Scoped Connection
if (res.num_rows() == 1) {
const mysqlpp::Row& resRow = res[0];
collection_schema.id_ = resRow["id"]; // implicit conversion
collection_schema.state_ = resRow["state"];
collection_schema.dimension_ = resRow["dimension"];
collection_schema.created_on_ = resRow["created_on"];
collection_schema.flag_ = resRow["flag"];
collection_schema.index_file_size_ = resRow["index_file_size"];
collection_schema.engine_type_ = resRow["engine_type"];
resRow["index_params"].to_string(collection_schema.index_params_);
collection_schema.metric_type_ = resRow["metric_type"];
resRow["owner_table"].to_string(collection_schema.owner_collection_);
resRow["partition_tag"].to_string(collection_schema.partition_tag_);
resRow["version"].to_string(collection_schema.version_);
collection_schema.flush_lsn_ = resRow["flush_lsn"];
} else {
return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found");
}
auto num_row = field_res.num_rows();
if (num_row >= 1) {
fields_schema.fields_schema_.resize(num_row);
for (uint64_t i = 0; i < num_row; ++i) {
const mysqlpp::Row& resRow = field_res[i];
resRow["collection_id"].to_string(fields_schema.fields_schema_[i].collection_id_);
resRow["field_name"].to_string(fields_schema.fields_schema_[i].field_name_);
fields_schema.fields_schema_[i].field_type_ = resRow["field_type"];
resRow["field_params"].to_string(fields_schema.fields_schema_[i].field_params_);
}
} else {
return Status(DB_NOT_FOUND, "Fields of " + collection_schema.collection_id_ + " not found");
}
} catch (std::exception& e) {
return HandleException("Failed to describe collection", e.what());
}
return Status::OK();
}
Status

View File

@ -34,8 +34,9 @@
#include "utils/StringHelpFunctions.h"
#include "utils/ValidationUtil.h"
#define USING_SQLITE_WARNING LOG_ENGINE_WARNING_ << \
"You are using SQLite as the meta data management, which can't be used in production. Please change it to MySQL!";
#define USING_SQLITE_WARNING \
LOG_ENGINE_WARNING_ << "You are using SQLite as the meta data management, which can't be used in production. " \
"Please change it to MySQL!";
namespace milvus {
namespace engine {
@ -83,59 +84,22 @@ StoragePrototype(const std::string& path) {
make_column("field_name", &hybrid::FieldSchema::field_name_),
make_column("field_type", &hybrid::FieldSchema::field_type_),
make_column("field_params", &hybrid::FieldSchema::field_params_)),
make_table(
META_TABLEFILES, make_column("id", &SegmentSchema::id_, primary_key()),
make_column("table_id", &SegmentSchema::collection_id_),
make_column("segment_id", &SegmentSchema::segment_id_, default_value("")),
make_column("engine_type", &SegmentSchema::engine_type_),
make_column("file_id", &SegmentSchema::file_id_), make_column("file_type", &SegmentSchema::file_type_),
make_column("file_size", &SegmentSchema::file_size_, default_value(0)),
make_column("row_count", &SegmentSchema::row_count_, default_value(0)),
make_column("updated_time", &SegmentSchema::updated_time_),
make_column("created_on", &SegmentSchema::created_on_), make_column("date", &SegmentSchema::date_),
make_column("flush_lsn", &SegmentSchema::flush_lsn_)));
}
inline auto
CollectionPrototype(const std::string& path) {
return make_storage(
path,
make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))),
make_table(META_COLLECTIONS, make_column("id", &hybrid::CollectionSchema::id_, primary_key()),
make_column("collection_id", &hybrid::CollectionSchema::collection_id_, unique()),
make_column("state", &hybrid::CollectionSchema::state_),
make_column("field_num", &hybrid::CollectionSchema::field_num),
make_column("created_on", &hybrid::CollectionSchema::created_on_),
make_column("flag", &hybrid::CollectionSchema::flag_, default_value(0)),
make_column("owner_collection", &hybrid::CollectionSchema::owner_collection_, default_value("")),
make_column("partition_tag", &hybrid::CollectionSchema::partition_tag_, default_value("")),
make_column("version", &hybrid::CollectionSchema::version_, default_value(CURRENT_VERSION)),
make_column("flush_lsn", &hybrid::CollectionSchema::flush_lsn_)),
make_table(META_FIELDS, make_column("collection_id", &hybrid::FieldSchema::collection_id_),
make_column("field_name", &hybrid::FieldSchema::field_name_),
make_column("field_type", &hybrid::FieldSchema::field_type_),
make_column("field_params", &hybrid::FieldSchema::field_params_)),
make_table(
META_COLLECTIONFILES,
make_column("id", &hybrid::CollectionFileSchema::id_, primary_key()),
make_column("collection_id", &hybrid::CollectionFileSchema::collection_id_),
make_column("segment_id", &hybrid::CollectionFileSchema::segment_id_, default_value("")),
make_column("file_id", &hybrid::CollectionFileSchema::file_id_),
make_column("file_type", &hybrid::CollectionFileSchema::file_type_),
make_column("file_size", &hybrid::CollectionFileSchema::file_size_, default_value(0)),
make_column("row_count", &hybrid::CollectionFileSchema::row_count_, default_value(0)),
make_column("updated_time", &hybrid::CollectionFileSchema::updated_time_),
make_column("created_on", &hybrid::CollectionFileSchema::created_on_),
make_column("date", &hybrid::CollectionFileSchema::date_),
make_column("flush_lsn", &hybrid::CollectionFileSchema::flush_lsn_)));
make_table(META_TABLEFILES, make_column("id", &SegmentSchema::id_, primary_key()),
make_column("table_id", &SegmentSchema::collection_id_),
make_column("segment_id", &SegmentSchema::segment_id_, default_value("")),
make_column("engine_type", &SegmentSchema::engine_type_),
make_column("file_id", &SegmentSchema::file_id_),
make_column("file_type", &SegmentSchema::file_type_),
make_column("file_size", &SegmentSchema::file_size_, default_value(0)),
make_column("row_count", &SegmentSchema::row_count_, default_value(0)),
make_column("updated_time", &SegmentSchema::updated_time_),
make_column("created_on", &SegmentSchema::created_on_), make_column("date", &SegmentSchema::date_),
make_column("flush_lsn", &SegmentSchema::flush_lsn_)));
}
using ConnectorT = decltype(StoragePrototype("table"));
static std::unique_ptr<ConnectorT> ConnectorPtr;
using CollectionConnectT = decltype(CollectionPrototype(""));
static std::unique_ptr<CollectionConnectT> CollectionConnectPtr;
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) {
Initialize();
}
@ -177,8 +141,8 @@ SqliteMetaImpl::ValidateMetaSchema() {
sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
if (ret.find(META_FIELDS) != ret.end()
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) {
if (ret.find(META_FIELDS) != ret.end() &&
sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
if (ret.find(META_TABLEFILES) != ret.end() &&
@ -187,30 +151,6 @@ SqliteMetaImpl::ValidateMetaSchema() {
}
}
void
SqliteMetaImpl::ValidateCollectionMetaSchema() {
bool is_null_connector{CollectionConnectPtr == nullptr};
fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true);
if (is_null_connector) {
return;
}
// old meta could be recreated since schema changed, throw exception if meta schema is not compatible
auto ret = CollectionConnectPtr->sync_schema_simulate();
if (ret.find(META_COLLECTIONS) != ret.end() &&
sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_COLLECTIONS]) {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
if (ret.find(META_FIELDS) != ret.end()
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
if (ret.find(META_COLLECTIONFILES) != ret.end() &&
sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
}
}
Status
SqliteMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path_)) {
@ -231,14 +171,6 @@ SqliteMetaImpl::Initialize() {
ConnectorPtr->open_forever(); // thread safe option
ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
CollectionConnectPtr = std::make_unique<CollectionConnectT>(CollectionPrototype(options_.path_ + "/metah.sqlite"));
ValidateCollectionMetaSchema();
CollectionConnectPtr->sync_schema();
CollectionConnectPtr->open_forever();
CollectionConnectPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
CleanUpShadowFiles();
return Status::OK();
@ -656,7 +588,7 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6
ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn),
where(c(&CollectionSchema::collection_id_) == collection_id));
LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id
<< " flush_lsn = " << flush_lsn;;
<< " flush_lsn = " << flush_lsn;
} catch (std::exception& e) {
std::string msg = "Encounter exception when update collection lsn: collection_id = " + collection_id;
return HandleException(msg, e.what());
@ -1094,10 +1026,9 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
}
// perform query
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
auto match_collectionid = c(&SegmentSchema::collection_id_) == collection_id;
@ -1309,14 +1240,10 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
}
// get files by type
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_,
&SegmentSchema::file_type_,
&SegmentSchema::file_size_,
&SegmentSchema::row_count_,
&SegmentSchema::date_,
&SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
@ -1348,21 +1275,29 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
file_schema.metric_type_ = collection_schema.metric_type_;
switch (file_schema.file_type_) {
case (int)SegmentSchema::RAW:++raw_count;
case (int)SegmentSchema::RAW:
++raw_count;
break;
case (int)SegmentSchema::NEW:++new_count;
case (int)SegmentSchema::NEW:
++new_count;
break;
case (int)SegmentSchema::NEW_MERGE:++new_merge_count;
case (int)SegmentSchema::NEW_MERGE:
++new_merge_count;
break;
case (int)SegmentSchema::NEW_INDEX:++new_index_count;
case (int)SegmentSchema::NEW_INDEX:
++new_index_count;
break;
case (int)SegmentSchema::TO_INDEX:++to_index_count;
case (int)SegmentSchema::TO_INDEX:
++to_index_count;
break;
case (int)SegmentSchema::INDEX:++index_count;
case (int)SegmentSchema::INDEX:
++index_count;
break;
case (int)SegmentSchema::BACKUP:++backup_count;
case (int)SegmentSchema::BACKUP:
++backup_count;
break;
default:
break;
default:break;
}
auto status = utils::GetCollectionFilePath(options_, file_schema);
@ -1376,25 +1311,29 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
std::string msg = "Get collection files by type.";
for (int file_type : file_types) {
switch (file_type) {
case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
case (int)SegmentSchema::RAW:
msg = msg + " raw files:" + std::to_string(raw_count);
break;
case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
case (int)SegmentSchema::NEW:
msg = msg + " new files:" + std::to_string(new_count);
break;
case (int)SegmentSchema::NEW_MERGE:
msg = msg + " new_merge files:"
+ std::to_string(new_merge_count);
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
break;
case (int)SegmentSchema::NEW_INDEX:
msg = msg + " new_index files:"
+ std::to_string(new_index_count);
msg = msg + " new_index files:" + std::to_string(new_index_count);
break;
case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count);
case (int)SegmentSchema::TO_INDEX:
msg = msg + " to_index files:" + std::to_string(to_index_count);
break;
case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
case (int)SegmentSchema::INDEX:
msg = msg + " index files:" + std::to_string(index_count);
break;
case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
case (int)SegmentSchema::BACKUP:
msg = msg + " backup files:" + std::to_string(backup_count);
break;
default:
break;
default:break;
}
}
LOG_ENGINE_DEBUG_ << msg;
@ -1416,10 +1355,9 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hol
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception());
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
@ -1613,12 +1551,12 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// collect files to be deleted
auto files = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::engine_type_, &SegmentSchema::file_id_, &SegmentSchema::file_type_,
&SegmentSchema::date_),
where(in(&SegmentSchema::file_type_, file_types) and
c(&SegmentSchema::updated_time_) < now - seconds * US_PS));
auto files =
ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::collection_id_,
&SegmentSchema::segment_id_, &SegmentSchema::engine_type_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::date_),
where(in(&SegmentSchema::file_type_, file_types) and
c(&SegmentSchema::updated_time_) < now - seconds * US_PS));
int64_t clean_files = 0;
auto commited = ConnectorPtr->transaction([&]() mutable {
@ -1828,10 +1766,9 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto commited = ConnectorPtr->transaction([&]() mutable {
auto selected =
ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::file_size_),
where(c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE),
order_by(&SegmentSchema::id_), limit(10));
auto selected = ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::file_size_),
where(c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE),
order_by(&SegmentSchema::id_), limit(10));
std::vector<int> ids;
SegmentSchema collection_file;
@ -1928,9 +1865,9 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema
NextCollectionId(collection_schema.collection_id_);
} else {
fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception());
auto collection = ConnectorPtr->select(columns(&CollectionSchema::state_),
where(c(&CollectionSchema::collection_id_)
== collection_schema.collection_id_));
auto collection =
ConnectorPtr->select(columns(&CollectionSchema::state_),
where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_));
if (collection.size() == 1) {
if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) {
return Status(DB_ERROR,
@ -1961,8 +1898,7 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema
}
try {
for (uint64_t i = 0; i < fields_schema.fields_schema_.size(); ++i) {
hybrid::FieldSchema schema = fields_schema.fields_schema_[i];
for (auto schema : fields_schema.fields_schema_) {
auto field_id = ConnectorPtr->insert(schema);
LOG_ENGINE_DEBUG_ << "Successfully create collection field" << field_id;
}
@ -2017,12 +1953,10 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema&
return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found");
}
auto field_groups = ConnectorPtr->select(
columns(&hybrid::FieldSchema::collection_id_,
&hybrid::FieldSchema::field_name_,
&hybrid::FieldSchema::field_type_,
&hybrid::FieldSchema::field_params_),
where(c(&hybrid::FieldSchema::collection_id_) == collection_schema.collection_id_));
auto field_groups =
ConnectorPtr->select(columns(&hybrid::FieldSchema::collection_id_, &hybrid::FieldSchema::field_name_,
&hybrid::FieldSchema::field_type_, &hybrid::FieldSchema::field_params_),
where(c(&hybrid::FieldSchema::collection_id_) == collection_schema.collection_id_));
if (field_groups.size() >= 1) {
fields_schema.fields_schema_.resize(field_groups.size());

View File

@ -164,8 +164,6 @@ class SqliteMetaImpl : public Meta {
void
ValidateMetaSchema();
void
ValidateCollectionMetaSchema();
Status
Initialize();

View File

@ -12,6 +12,8 @@
#include "db/wal/WalBuffer.h"
#include <cstring>
#include <utility>
#include <vector>
#include "db/wal/WalDefinations.h"
#include "utils/Log.h"
@ -189,6 +191,28 @@ MXLogBuffer::RecordSize(const MXLogRecord& record) {
record.length * (uint32_t)sizeof(IDNumber) + record.data_size;
}
uint32_t
MXLogBuffer::EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num,
std::vector<uint32_t>& field_name_size) {
uint32_t attr_header_size = 0;
attr_header_size += sizeof(uint32_t);
attr_header_size += attr_num * sizeof(uint64_t) * 3;
uint32_t name_sizes = 0;
for (auto field_name : record.field_names) {
field_name_size.emplace_back(field_name.size());
name_sizes += field_name.size();
}
uint64_t attr_size = 0;
auto attr_it = record.attr_data_size.begin();
for (; attr_it != record.attr_data_size.end(); attr_it++) {
attr_size += attr_it->second;
}
return RecordSize(record) + name_sizes + attr_size + attr_header_size;
}
ErrorCode
MXLogBuffer::Append(MXLogRecord& record) {
uint32_t record_size = RecordSize(record);
@ -257,6 +281,116 @@ MXLogBuffer::Append(MXLogRecord& record) {
return WAL_SUCCESS;
}
ErrorCode
MXLogBuffer::AppendEntity(milvus::engine::wal::MXLogRecord& record) {
std::vector<uint32_t> field_name_size;
MXLogAttrRecordHeader attr_header;
attr_header.attr_num = 0;
for (auto name : record.field_names) {
attr_header.attr_num++;
attr_header.field_name_size.emplace_back(name.size());
attr_header.attr_size.emplace_back(record.attr_data_size.at(name));
attr_header.attr_nbytes.emplace_back(record.attr_nbytes.at(name));
}
uint32_t record_size = EntityRecordSize(record, attr_header.attr_num, field_name_size);
if (SurplusSpace() < record_size) {
// writer buffer has no space, switch wal file and write to a new buffer
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_writer_.buf_idx == mxlog_buffer_reader_.buf_idx) {
// swith writer buffer
mxlog_buffer_reader_.max_offset = mxlog_buffer_writer_.buf_offset;
mxlog_buffer_writer_.buf_idx ^= 1;
}
mxlog_buffer_writer_.file_no++;
mxlog_buffer_writer_.buf_offset = 0;
lck.unlock();
// Reborn means close old wal file and open new wal file
if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "w")) {
LOG_WAL_ERROR_ << "ReBorn wal file error " << mxlog_buffer_writer_.file_no;
return WAL_FILE_ERROR;
}
}
// point to the offset of current record in wal file
char* current_write_buf = buf_[mxlog_buffer_writer_.buf_idx].get();
uint32_t current_write_offset = mxlog_buffer_writer_.buf_offset;
MXLogRecordHeader head;
BuildLsn(mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset + (uint32_t)record_size, head.mxl_lsn);
head.mxl_type = (uint8_t)record.type;
head.table_id_size = (uint16_t)record.collection_id.size();
head.partition_tag_size = (uint16_t)record.partition_tag.size();
head.vector_num = record.length;
head.data_size = record.data_size;
memcpy(current_write_buf + current_write_offset, &head, SizeOfMXLogRecordHeader);
current_write_offset += SizeOfMXLogRecordHeader;
memcpy(current_write_buf + current_write_offset, &attr_header.attr_num, sizeof(int32_t));
current_write_offset += sizeof(int32_t);
memcpy(current_write_buf + current_write_offset, attr_header.field_name_size.data(),
sizeof(int64_t) * attr_header.attr_num);
current_write_offset += sizeof(int64_t) * attr_header.attr_num;
memcpy(current_write_buf + current_write_offset, attr_header.attr_size.data(),
sizeof(int64_t) * attr_header.attr_num);
current_write_offset += sizeof(int64_t) * attr_header.attr_num;
memcpy(current_write_buf + current_write_offset, attr_header.attr_nbytes.data(),
sizeof(int64_t) * attr_header.attr_num);
current_write_offset += sizeof(int64_t) * attr_header.attr_num;
if (!record.collection_id.empty()) {
memcpy(current_write_buf + current_write_offset, record.collection_id.data(), record.collection_id.size());
current_write_offset += record.collection_id.size();
}
if (!record.partition_tag.empty()) {
memcpy(current_write_buf + current_write_offset, record.partition_tag.data(), record.partition_tag.size());
current_write_offset += record.partition_tag.size();
}
if (record.ids != nullptr && record.length > 0) {
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(IDNumber));
current_write_offset += record.length * sizeof(IDNumber);
}
if (record.data != nullptr && record.data_size > 0) {
memcpy(current_write_buf + current_write_offset, record.data, record.data_size);
current_write_offset += record.data_size;
}
// Assign attr names
for (auto name : record.field_names) {
if (name.size() > 0) {
memcpy(current_write_buf + current_write_offset, name.data(), name.size());
current_write_offset += name.size();
}
}
// Assign attr values
for (auto name : record.field_names) {
if (record.attr_data_size.at(name) != 0) {
memcpy(current_write_buf + current_write_offset, record.attr_data.at(name).data(),
record.attr_data_size.at(name));
current_write_offset += record.attr_data_size.at(name);
}
}
bool write_rst = mxlog_writer_.Write(current_write_buf + mxlog_buffer_writer_.buf_offset, record_size);
if (!write_rst) {
LOG_WAL_ERROR_ << "write wal file error";
return WAL_FILE_ERROR;
}
mxlog_buffer_writer_.buf_offset = current_write_offset;
record.lsn = head.mxl_lsn;
return WAL_SUCCESS;
}
ErrorCode
MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) {
// init output
@ -337,6 +471,138 @@ MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) {
return WAL_SUCCESS;
}
ErrorCode
MXLogBuffer::NextEntity(const uint64_t last_applied_lsn, milvus::engine::wal::MXLogRecord& record) {
// init output
record.type = MXLogType::None;
// reader catch up to writer, no next record, read fail
if (GetReadLsn() >= last_applied_lsn) {
return WAL_SUCCESS;
}
// otherwise, it means there must exists next record, in buffer or wal log
bool need_load_new = false;
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no) {
if (mxlog_buffer_reader_.buf_offset == mxlog_buffer_reader_.max_offset) { // last record
mxlog_buffer_reader_.file_no++;
mxlog_buffer_reader_.buf_offset = 0;
need_load_new = (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no);
if (!need_load_new) {
// read reach write buffer
mxlog_buffer_reader_.buf_idx = mxlog_buffer_writer_.buf_idx;
}
}
}
lck.unlock();
if (need_load_new) {
MXLogFileHandler mxlog_reader(mxlog_writer_.GetFilePath());
mxlog_reader.SetFileName(ToFileName(mxlog_buffer_reader_.file_no));
mxlog_reader.SetFileOpenMode("r");
uint32_t file_size = mxlog_reader.Load(buf_[mxlog_buffer_reader_.buf_idx].get(), 0);
if (file_size == 0) {
LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_reader_.file_no;
return WAL_FILE_ERROR;
}
mxlog_buffer_reader_.max_offset = file_size;
}
char* current_read_buf = buf_[mxlog_buffer_reader_.buf_idx].get();
uint64_t current_read_offset = mxlog_buffer_reader_.buf_offset;
MXLogRecordHeader* head = (MXLogRecordHeader*)(current_read_buf + current_read_offset);
record.type = (MXLogType)head->mxl_type;
record.lsn = head->mxl_lsn;
record.length = head->vector_num;
record.data_size = head->data_size;
current_read_offset += SizeOfMXLogRecordHeader;
MXLogAttrRecordHeader attr_head;
memcpy(&attr_head.attr_num, current_read_buf + current_read_offset, sizeof(uint32_t));
current_read_offset += sizeof(uint32_t);
attr_head.attr_size.resize(attr_head.attr_num);
attr_head.field_name_size.resize(attr_head.attr_num);
attr_head.attr_nbytes.resize(attr_head.attr_num);
memcpy(attr_head.field_name_size.data(), current_read_buf + current_read_offset,
sizeof(uint64_t) * attr_head.attr_num);
current_read_offset += sizeof(uint64_t) * attr_head.attr_num;
memcpy(attr_head.attr_size.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num);
current_read_offset += sizeof(uint64_t) * attr_head.attr_num;
memcpy(attr_head.attr_nbytes.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num);
current_read_offset += sizeof(uint64_t) * attr_head.attr_num;
if (head->table_id_size != 0) {
record.collection_id.assign(current_read_buf + current_read_offset, head->table_id_size);
current_read_offset += head->table_id_size;
} else {
record.collection_id = "";
}
if (head->partition_tag_size != 0) {
record.partition_tag.assign(current_read_buf + current_read_offset, head->partition_tag_size);
current_read_offset += head->partition_tag_size;
} else {
record.partition_tag = "";
}
if (head->vector_num != 0) {
record.ids = (IDNumber*)(current_read_buf + current_read_offset);
current_read_offset += head->vector_num * sizeof(IDNumber);
} else {
record.ids = nullptr;
}
if (record.data_size != 0) {
record.data = current_read_buf + current_read_offset;
current_read_offset += record.data_size;
} else {
record.data = nullptr;
}
// Read field names
auto attr_num = attr_head.attr_num;
record.field_names.clear();
if (attr_num > 0) {
for (auto size : attr_head.field_name_size) {
if (size != 0) {
std::string name;
name.assign(current_read_buf + current_read_offset, size);
record.field_names.emplace_back(name);
current_read_offset += size;
} else {
record.field_names.emplace_back("");
}
}
}
// Read attributes data
record.attr_data.clear();
record.attr_data_size.clear();
record.attr_nbytes.clear();
if (attr_num > 0) {
for (uint64_t i = 0; i < attr_num; ++i) {
auto attr_size = attr_head.attr_size[i];
record.attr_data_size.insert(std::make_pair(record.field_names[i], attr_size));
record.attr_nbytes.insert(std::make_pair(record.field_names[i], attr_head.attr_nbytes[i]));
std::vector<uint8_t> data(attr_size);
memcpy(data.data(), current_read_buf + current_read_offset, attr_size);
record.attr_data.insert(std::make_pair(record.field_names[i], data));
current_read_offset += attr_size;
}
}
mxlog_buffer_reader_.buf_offset = uint32_t(head->mxl_lsn & LSN_OFFSET_MASK);
return WAL_SUCCESS;
}
uint64_t
MXLogBuffer::GetReadLsn() {
uint64_t read_lsn;

View File

@ -15,6 +15,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "WalDefinations.h"
#include "WalFileHandler.h"
@ -39,6 +40,13 @@ struct MXLogRecordHeader {
const uint32_t SizeOfMXLogRecordHeader = sizeof(MXLogRecordHeader);
struct MXLogAttrRecordHeader {
uint32_t attr_num;
std::vector<uint64_t> field_name_size;
std::vector<uint64_t> attr_size;
std::vector<uint64_t> attr_nbytes;
};
#pragma pack(pop)
struct MXLogBufferHandler {
@ -66,9 +74,15 @@ class MXLogBuffer {
ErrorCode
Append(MXLogRecord& record);
ErrorCode
AppendEntity(MXLogRecord& record);
ErrorCode
Next(const uint64_t last_applied_lsn, MXLogRecord& record);
ErrorCode
NextEntity(const uint64_t last_applied_lsn, MXLogRecord& record);
uint64_t
GetReadLsn();
@ -91,6 +105,10 @@ class MXLogBuffer {
uint32_t
RecordSize(const MXLogRecord& record);
uint32_t
EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num,
std::vector<uint32_t>& field_name_size);
private:
uint32_t mxlog_buffer_size_; // from config
BufferPtr buf_[2];

View File

@ -40,6 +40,9 @@ struct MXLogRecord {
const IDNumber* ids;
uint32_t data_size;
const void* data;
std::vector<std::string> field_names;
// std::vector<uint32_t> attrs_size;
// std::vector<const void* > attrs_data;
std::unordered_map<std::string, uint64_t> attr_nbytes;
std::unordered_map<std::string, uint64_t> attr_data_size;
std::unordered_map<std::string, std::vector<uint8_t>> attr_data;

View File

@ -15,6 +15,7 @@
#include <algorithm>
#include <memory>
#include <unordered_map>
#include "config/Config.h"
#include "utils/CommonUtil.h"
@ -157,6 +158,44 @@ WalManager::GetNextRecovery(MXLogRecord& record) {
return error_code;
}
ErrorCode
WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) {
ErrorCode error_code = WAL_SUCCESS;
while (true) {
error_code = p_buffer_->NextEntity(last_applied_lsn_, record);
if (error_code != WAL_SUCCESS) {
if (mxlog_config_.recovery_error_ignore) {
// reset and break recovery
p_buffer_->Reset(last_applied_lsn_);
record.type = MXLogType::None;
error_code = WAL_SUCCESS;
}
break;
}
if (record.type == MXLogType::None) {
break;
}
// background thread has not started.
// so, needn't lock here.
auto it = tables_.find(record.collection_id);
if (it != tables_.end()) {
if (it->second.flush_lsn < record.lsn) {
break;
}
}
}
// print the log only when record.type != MXLogType::None
if (record.type != MXLogType::None) {
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " record lsn " << record.lsn << " error code "
<< error_code;
}
return error_code;
}
ErrorCode
WalManager::GetNextRecord(MXLogRecord& record) {
auto check_flush = [&]() -> bool {
@ -203,6 +242,52 @@ WalManager::GetNextRecord(MXLogRecord& record) {
return error_code;
}
ErrorCode
WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) {
auto check_flush = [&]() -> bool {
std::lock_guard<std::mutex> lck(mutex_);
if (flush_info_.IsValid()) {
if (p_buffer_->GetReadLsn() >= flush_info_.lsn_) {
// can exec flush requirement
record.type = MXLogType::Flush;
record.collection_id = flush_info_.collection_id_;
record.lsn = flush_info_.lsn_;
flush_info_.Clear();
LOG_WAL_INFO_ << "record flush collection " << record.collection_id << " lsn " << record.lsn;
return true;
}
}
return false;
};
if (check_flush()) {
return WAL_SUCCESS;
}
ErrorCode error_code = WAL_SUCCESS;
while (WAL_SUCCESS == p_buffer_->NextEntity(last_applied_lsn_, record)) {
if (record.type == MXLogType::None) {
if (check_flush()) {
return WAL_SUCCESS;
}
break;
}
std::lock_guard<std::mutex> lck(mutex_);
auto it = tables_.find(record.collection_id);
if (it != tables_.end()) {
if (it->second.flush_lsn < record.lsn) {
break;
}
}
}
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
<< record.lsn;
return error_code;
}
uint64_t
WalManager::CreateCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_;
@ -212,6 +297,15 @@ WalManager::CreateCollection(const std::string& collection_id) {
return applied_lsn;
}
uint64_t
WalManager::CreateHybridCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
tables_[collection_id] = {applied_lsn, applied_lsn};
return applied_lsn;
}
void
WalManager::DropCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "drop collection " << collection_id;
@ -300,6 +394,98 @@ WalManager::Insert(const std::string& collection_id, const std::string& partitio
return p_meta_handler_->SetMXLogInternalMeta(new_lsn);
}
template <typename T>
bool
WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<T>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs) {
MXLogType log_type;
if (std::is_same<T, float>::value) {
log_type = MXLogType::Entity;
} else {
return false;
}
size_t entity_num = entity_ids.size();
if (entity_num == 0) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] The ids is empty.", "insert", 0);
return false;
}
size_t dim = vectors.size() / entity_num;
MXLogRecord record;
size_t attr_unit_size = 0;
auto attr_it = attr_nbytes.begin();
for (; attr_it != attr_nbytes.end(); attr_it++) {
record.field_names.emplace_back(attr_it->first);
attr_unit_size += attr_it->second;
}
size_t unit_size = dim * sizeof(T) + sizeof(IDNumber) + attr_unit_size;
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length();
// TODO(yukun): field_name put into MXLogRecord??
record.type = log_type;
record.collection_id = collection_id;
record.partition_tag = partition_tag;
record.attr_nbytes = attr_nbytes;
uint64_t new_lsn = 0;
for (size_t i = 0; i < entity_num; i += record.length) {
size_t surplus_space = p_buffer_->SurplusSpace();
size_t max_rcd_num = 0;
if (surplus_space >= head_size + unit_size) {
max_rcd_num = (surplus_space - head_size) / unit_size;
} else {
max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size;
}
if (max_rcd_num == 0) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld]", "insert", 0) << "Wal buffer size is too small "
<< mxlog_config_.buffer_size << " unit " << unit_size;
return false;
}
size_t length = std::min(entity_num - i, max_rcd_num);
record.length = length;
record.ids = entity_ids.data() + i;
record.data_size = record.length * dim * sizeof(T);
record.data = vectors.data() + i * dim;
record.attr_data.clear();
record.attr_data_size.clear();
for (auto field_name : record.field_names) {
size_t attr_size = length * attr_nbytes.at(field_name);
record.attr_data_size.insert(std::make_pair(field_name, attr_size));
std::vector<uint8_t> attr_data(attr_size, 0);
memcpy(attr_data.data(), attrs.at(field_name).data() + i * attr_nbytes.at(field_name), attr_size);
record.attr_data.insert(std::make_pair(field_name, attr_data));
}
auto error_code = p_buffer_->AppendEntity(record);
if (error_code != WAL_SUCCESS) {
p_buffer_->ResetWriteLsn(last_applied_lsn_);
return false;
}
new_lsn = record.lsn;
}
std::unique_lock<std::mutex> lck(mutex_);
last_applied_lsn_ = new_lsn;
auto it = tables_.find(collection_id);
if (it != tables_.end()) {
it->second.wal_lsn = new_lsn;
}
lck.unlock();
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
<< " with lsn " << new_lsn;
return p_meta_handler_->SetMXLogInternalMeta(new_lsn);
}
bool
WalManager::DeleteById(const std::string& collection_id, const IDNumbers& vector_ids) {
size_t vector_num = vector_ids.size();
@ -404,6 +590,18 @@ template bool
WalManager::Insert<uint8_t>(const std::string& collection_id, const std::string& partition_tag,
const IDNumbers& vector_ids, const std::vector<uint8_t>& vectors);
template bool
WalManager::InsertEntities<float>(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<float>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs);
template bool
WalManager::InsertEntities<uint8_t>(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<uint8_t>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs);
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -14,6 +14,7 @@
#include <atomic>
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
@ -48,6 +49,9 @@ class WalManager {
ErrorCode
GetNextRecovery(MXLogRecord& record);
ErrorCode
GetNextEntityRecovery(MXLogRecord& record);
/*
* Get next record
* @param record[out]: record
@ -56,6 +60,8 @@ class WalManager {
ErrorCode
GetNextRecord(MXLogRecord& record);
ErrorCode
GetNextEntityRecord(MXLogRecord& record);
/*
* Create collection
* @param collection_id: collection id
@ -64,6 +70,14 @@ class WalManager {
uint64_t
CreateCollection(const std::string& collection_id);
/*
* Create hybrid collection
* @param collection_id: collection id
* @retval lsn
*/
uint64_t
CreateHybridCollection(const std::string& collection_id);
/*
* Drop collection
* @param collection_id: collection id
@ -92,6 +106,21 @@ class WalManager {
Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids,
const std::vector<T>& vectors);
/*
* Insert
* @param collection_id: collection id
* @param partition_tag: partition tag
* @param vector_ids: vector ids
* @param vectors: vectors
* @param attrs: attributes
*/
template <typename T>
bool
InsertEntities(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<T>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs);
/*
* Insert
* @param collection_id: collection id

View File

@ -88,26 +88,26 @@ BuildEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) {
vectors.id_array_.push_back(n * batch_index + i);
}
entity.vector_data_.insert(std::make_pair("field_3", vectors));
std::vector<int32_t> value_0;
std::vector<int64_t> value_0;
std::vector<int64_t> value_1;
std::vector<float> value_2;
std::vector<double> value_2;
value_0.resize(n);
value_1.resize(n);
value_2.resize(n);
for (uint64_t i = 0; i < n; ++i) {
value_0[i] = i;
value_1[i] = i + n;
value_2[i] = (float)((i + 100) / (n + 1));
value_2[i] = (double)((i + 100) / (n + 1));
}
entity.entity_count_ = n;
size_t attr_size = n * (sizeof(int32_t) + sizeof(float) + sizeof(int64_t));
size_t attr_size = n * (sizeof(int64_t) + sizeof(double) + sizeof(int64_t));
std::vector<uint8_t> attr_value(attr_size, 0);
size_t offset = 0;
memcpy(attr_value.data(), value_0.data(), n * sizeof(int32_t));
offset += n * sizeof(int32_t);
memcpy(attr_value.data(), value_0.data(), n * sizeof(int64_t));
offset += n * sizeof(int64_t);
memcpy(attr_value.data() + offset, value_1.data(), n * sizeof(int64_t));
offset += n * sizeof(int64_t);
memcpy(attr_value.data() + offset, value_2.data(), n * sizeof(float));
memcpy(attr_value.data() + offset, value_2.data(), n * sizeof(double));
entity.attr_value_ = attr_value;
}

View File

@ -15,12 +15,11 @@
#include "db/meta/SqliteMetaImpl.h"
#include "db/utils.h"
#include <fiu-control.h>
#include <fiu-local.h>
#include <gtest/gtest.h>
#include <stdlib.h>
#include <time.h>
#include <thread>
#include <fiu-local.h>
#include <fiu-control.h>
#include <boost/filesystem/operations.hpp>
TEST_F(MetaTest, COLLECTION_TEST) {
@ -71,7 +70,7 @@ TEST_F(MetaTest, FALID_TEST) {
fiu_disable("SqliteMetaImpl.ValidateMetaSchema.NullConnection");
}
{
//failed initialize
// failed initialize
auto options_1 = options;
options_1.meta_.path_ = options.meta_.path_ + "1";
if (boost::filesystem::is_directory(options_1.meta_.path_)) {
@ -97,7 +96,7 @@ TEST_F(MetaTest, FALID_TEST) {
ASSERT_FALSE(status.ok());
fiu_disable("SqliteMetaImpl.CreateCollection.insert_throw_exception");
//success create collection
// success create collection
collection.collection_id_ = collection_id;
status = impl_->CreateCollection(collection);
ASSERT_TRUE(status.ok());
@ -236,7 +235,7 @@ TEST_F(MetaTest, FALID_TEST) {
status = impl_->CreatePartition(collection_id, partition, partition_tag, 0);
ASSERT_FALSE(status.ok());
//create empty name partition
// create empty name partition
partition = "";
status = impl_->CreatePartition(collection_id, partition, partition_tag, 0);
ASSERT_TRUE(status.ok());
@ -381,6 +380,34 @@ TEST_F(MetaTest, COLLECTION_FILE_TEST) {
ASSERT_EQ(table_file.file_type_, new_file_type);
}
TEST_F(MetaTest, HYBRID_COLLECTION_TEST) {
auto collection_id = "meta_test_hybrid";
milvus::engine::meta::CollectionSchema collection;
collection.collection_id_ = collection_id;
collection.dimension_ = 128;
milvus::engine::meta::hybrid::FieldsSchema fields_schema;
fields_schema.fields_schema_.resize(2);
fields_schema.fields_schema_[0].collection_id_ = collection_id;
fields_schema.fields_schema_[0].field_name_ = "field_0";
fields_schema.fields_schema_[0].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::INT64;
fields_schema.fields_schema_[0].field_params_ = "";
fields_schema.fields_schema_[1].collection_id_ = collection_id;
fields_schema.fields_schema_[1].field_name_ = "field_1";
fields_schema.fields_schema_[1].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::VECTOR;
fields_schema.fields_schema_[1].field_params_ = "";
auto status = impl_->CreateHybridCollection(collection, fields_schema);
ASSERT_TRUE(status.ok());
milvus::engine::meta::CollectionSchema describe_collection;
milvus::engine::meta::hybrid::FieldsSchema describe_fields;
describe_collection.collection_id_ = collection_id;
status = impl_->DescribeHybridCollection(describe_collection, describe_fields);
ASSERT_TRUE(status.ok());
ASSERT_EQ(describe_fields.fields_schema_.size(), 2);
}
TEST_F(MetaTest, COLLECTION_FILE_ROW_COUNT_TEST) {
auto collection_id = "row_count_test_table";
@ -649,9 +676,9 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
ASSERT_FALSE(status.ok());
file_types = {
milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE,
milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE,
milvus::engine::meta::SegmentSchema::NEW_INDEX, milvus::engine::meta::SegmentSchema::TO_INDEX,
milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW,
milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW,
milvus::engine::meta::SegmentSchema::BACKUP,
};
status = impl_->FilesByType(collection.collection_id_, file_types, files_holder);

View File

@ -14,6 +14,8 @@
#include "db/meta/MySQLMetaImpl.h"
#include "db/utils.h"
#include <fiu-control.h>
#include <fiu-local.h>
#include <gtest/gtest.h>
#include <mysql++/mysql++.h>
#include <stdlib.h>
@ -21,8 +23,6 @@
#include <boost/filesystem/operations.hpp>
#include <iostream>
#include <thread>
#include <fiu-local.h>
#include <fiu-control.h>
const char* FAILED_CONNECT_SQL_SERVER = "Failed to connect to meta server(mysql)";
const char* COLLECTION_ALREADY_EXISTS = "Collection already exists and it is in delete state, please wait a second";
@ -67,7 +67,7 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) {
ASSERT_FALSE(stat.ok());
fiu_disable("MySQLMetaImpl.CreateCollection.throw_exception");
//ensure collection exists
// ensure collection exists
stat = impl_->CreateCollection(collection);
FIU_ENABLE_FIU("MySQLMetaImpl.CreateCollection.schema_TO_DELETE");
stat = impl_->CreateCollection(collection);
@ -121,6 +121,34 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) {
ASSERT_TRUE(status.ok());
}
TEST_F(MySqlMetaTest, HYBRID_COLLECTION_TEST) {
auto collection_id = "meta_test_hybrid";
milvus::engine::meta::CollectionSchema collection;
collection.collection_id_ = collection_id;
collection.dimension_ = 128;
milvus::engine::meta::hybrid::FieldsSchema fields_schema;
fields_schema.fields_schema_.resize(2);
fields_schema.fields_schema_[0].collection_id_ = collection_id;
fields_schema.fields_schema_[0].field_name_ = "field_0";
fields_schema.fields_schema_[0].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::INT64;
fields_schema.fields_schema_[0].field_params_ = "";
fields_schema.fields_schema_[1].collection_id_ = collection_id;
fields_schema.fields_schema_[1].field_name_ = "field_1";
fields_schema.fields_schema_[1].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::VECTOR;
fields_schema.fields_schema_[1].field_params_ = "";
auto status = impl_->CreateHybridCollection(collection, fields_schema);
ASSERT_TRUE(status.ok());
milvus::engine::meta::CollectionSchema describe_collection;
milvus::engine::meta::hybrid::FieldsSchema describe_fields;
describe_collection.collection_id_ = collection_id;
status = impl_->DescribeHybridCollection(describe_collection, describe_fields);
ASSERT_TRUE(status.ok());
ASSERT_EQ(describe_fields.fields_schema_.size(), 2);
}
TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
auto collection_id = "meta_test_table";
fiu_init(0);
@ -135,7 +163,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
collection.dimension_ = 256;
status = impl_->CreateCollection(collection);
//CreateCollectionFile
// CreateCollectionFile
milvus::engine::meta::SegmentSchema table_file;
table_file.collection_id_ = collection.collection_id_;
status = impl_->CreateCollectionFile(table_file);
@ -157,7 +185,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.DescribeCollection.throw_exception");
//Count
// Count
uint64_t cnt = 0;
status = impl_->Count(collection_id, cnt);
// ASSERT_TRUE(status.ok());
@ -182,7 +210,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
auto new_file_type = milvus::engine::meta::SegmentSchema::INDEX;
table_file.file_type_ = new_file_type;
//UpdateCollectionFile
// UpdateCollectionFile
FIU_ENABLE_FIU("MySQLMetaImpl.UpdateCollectionFile.null_connection");
status = impl_->UpdateCollectionFile(table_file);
ASSERT_FALSE(status.ok());
@ -487,7 +515,7 @@ TEST_F(MySqlMetaTest, INVALID_INITILIZE_TEST) {
milvus::engine::DBMetaOptions meta = GetOptions().meta_;
{
FIU_ENABLE_FIU("MySQLMetaImpl.Initialize.fail_create_directory");
//delete directory created by SetUp
// delete directory created by SetUp
boost::filesystem::remove_all(meta.path_);
ASSERT_ANY_THROW(milvus::engine::meta::MySQLMetaImpl impl(meta, GetOptions().mode_));
fiu_disable("MySQLMetaImpl.Initialize.fail_create_directory");
@ -674,9 +702,9 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) {
ASSERT_FALSE(status.ok());
file_types = {
milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE,
milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE,
milvus::engine::meta::SegmentSchema::NEW_INDEX, milvus::engine::meta::SegmentSchema::TO_INDEX,
milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW,
milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW,
milvus::engine::meta::SegmentSchema::BACKUP,
};
status = impl_->FilesByType(collection.collection_id_, file_types, files_holder);
@ -810,4 +838,3 @@ TEST_F(MySqlMetaTest, INDEX_TEST) {
status = impl_->UpdateCollectionFilesToIndex(collection_id);
ASSERT_TRUE(status.ok());
}

View File

@ -16,6 +16,7 @@
#include <time.h>
#include <fstream>
#include <random>
#include <sstream>
#include <thread>
@ -413,6 +414,254 @@ TEST(WalTest, BUFFER_TEST) {
}
}
TEST(WalTest, HYBRID_BUFFFER_TEST) {
MakeEmptyTestPath();
milvus::engine::wal::MXLogBuffer buffer(WAL_GTEST_PATH, 2048);
uint32_t file_no = 4;
uint32_t buf_off = 100;
uint64_t lsn = (uint64_t)file_no << 32 | buf_off;
buffer.mxlog_buffer_size_ = 2000;
buffer.Reset(lsn);
milvus::engine::wal::MXLogRecord record[4];
milvus::engine::wal::MXLogRecord read_rst;
// write 0
record[0].type = milvus::engine::wal::MXLogType::Entity;
record[0].collection_id = "insert_hybrid_collection";
record[0].partition_tag = "parti1";
uint64_t length = 50;
record[0].length = length;
record[0].ids = (milvus::engine::IDNumber*)malloc(record[0].length * sizeof(milvus::engine::IDNumber));
record[0].data_size = record[0].length * sizeof(float);
record[0].data = malloc(record[0].data_size);
record[0].field_names.resize(2);
record[0].field_names[0] = "field_0";
record[0].field_names[1] = "field_1";
record[0].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t)));
record[0].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float)));
record[0].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t)));
record[0].attr_nbytes.insert(std::make_pair("field_1", sizeof(float)));
std::vector<int64_t> data_0(length);
std::default_random_engine e;
std::uniform_int_distribution<unsigned> u(0, 1000);
for (uint64_t i = 0; i < length; ++i) {
data_0[i] = u(e);
}
std::vector<uint8_t> attr_data_0(length * sizeof(int64_t));
memcpy(attr_data_0.data(), data_0.data(), length * sizeof(int64_t));
record[0].attr_data.insert(std::make_pair("field_0", attr_data_0));
std::vector<float> data_1(length);
std::default_random_engine e1;
std::uniform_real_distribution<float> u1(0, 1);
for (uint64_t i = 0; i < length; ++i) {
data_1[i] = u1(e1);
}
std::vector<uint8_t> attr_data_1(length * sizeof(float));
memcpy(attr_data_1.data(), data_1.data(), length * sizeof(float));
record[0].attr_data.insert(std::make_pair("field_1", attr_data_1));
ASSERT_EQ(buffer.AppendEntity(record[0]), milvus::WAL_SUCCESS);
uint32_t new_file_no = uint32_t(record[0].lsn >> 32);
ASSERT_EQ(new_file_no, ++file_no);
// write 1
record[1].type = milvus::engine::wal::MXLogType::Delete;
record[1].collection_id = "insert_hybrid_collection";
record[1].partition_tag = "parti1";
length = 10;
record[1].length = length;
record[1].ids = (milvus::engine::IDNumber*)malloc(record[0].length * sizeof(milvus::engine::IDNumber));
record[1].data_size = 0;
record[1].data = nullptr;
record[1].field_names.resize(2);
record[1].field_names[0] = "field_0";
record[1].field_names[1] = "field_1";
record[1].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t)));
record[1].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float)));
record[1].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t)));
record[1].attr_nbytes.insert(std::make_pair("field_1", sizeof(float)));
std::vector<int64_t> data1_0(length);
for (uint64_t i = 0; i < length; ++i) {
data_0[i] = u(e);
}
std::vector<uint8_t> attr_data1_0(length * sizeof(int64_t));
memcpy(attr_data1_0.data(), data1_0.data(), length * sizeof(int64_t));
record[1].attr_data.insert(std::make_pair("field_0", attr_data1_0));
std::vector<float> data1_1(length);
for (uint64_t i = 0; i < length; ++i) {
data_1[i] = u1(e1);
}
std::vector<uint8_t> attr_data1_1(length * sizeof(float));
memcpy(attr_data1_1.data(), data1_1.data(), length * sizeof(float));
record[1].attr_data.insert(std::make_pair("field_1", attr_data1_1));
ASSERT_EQ(buffer.AppendEntity(record[1]), milvus::WAL_SUCCESS);
new_file_no = uint32_t(record[1].lsn >> 32);
ASSERT_EQ(new_file_no, file_no);
// read 0
ASSERT_EQ(buffer.NextEntity(record[1].lsn, read_rst), milvus::WAL_SUCCESS);
ASSERT_EQ(read_rst.type, record[0].type);
ASSERT_EQ(read_rst.collection_id, record[0].collection_id);
ASSERT_EQ(read_rst.partition_tag, record[0].partition_tag);
ASSERT_EQ(read_rst.length, record[0].length);
ASSERT_EQ(memcmp(read_rst.ids, record[0].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0);
ASSERT_EQ(read_rst.data_size, record[0].data_size);
ASSERT_EQ(memcmp(read_rst.data, record[0].data, read_rst.data_size), 0);
ASSERT_EQ(read_rst.field_names.size(), record[0].field_names.size());
ASSERT_EQ(read_rst.field_names[0], record[0].field_names[0]);
ASSERT_EQ(read_rst.attr_data.at("field_0").size(), record[0].attr_data.at("field_0").size());
ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[0].attr_nbytes.at("field_0"));
// read 1
ASSERT_EQ(buffer.NextEntity(record[1].lsn, read_rst), milvus::WAL_SUCCESS);
ASSERT_EQ(read_rst.type, record[1].type);
ASSERT_EQ(read_rst.collection_id, record[1].collection_id);
ASSERT_EQ(read_rst.partition_tag, record[1].partition_tag);
ASSERT_EQ(read_rst.length, record[1].length);
ASSERT_EQ(memcmp(read_rst.ids, record[1].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0);
ASSERT_EQ(read_rst.data_size, 0);
ASSERT_EQ(read_rst.data, nullptr);
ASSERT_EQ(read_rst.field_names.size(), record[1].field_names.size());
ASSERT_EQ(read_rst.field_names[1], record[1].field_names[1]);
ASSERT_EQ(read_rst.attr_data.at("field_1").size(), record[1].attr_data.at("field_1").size());
ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[1].attr_nbytes.at("field_0"));
// read empty
ASSERT_EQ(buffer.NextEntity(record[1].lsn, read_rst), milvus::WAL_SUCCESS);
ASSERT_EQ(read_rst.type, milvus::engine::wal::MXLogType::None);
// write 2 (new file)
record[2].type = milvus::engine::wal::MXLogType::Entity;
record[2].collection_id = "insert_table";
record[2].partition_tag = "parti1";
length = 50;
record[2].length = length;
record[2].ids = (milvus::engine::IDNumber*)malloc(record[2].length * sizeof(milvus::engine::IDNumber));
record[2].data_size = record[2].length * sizeof(float);
record[2].data = malloc(record[2].data_size);
record[2].field_names.resize(2);
record[2].field_names[0] = "field_0";
record[2].field_names[1] = "field_1";
record[2].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t)));
record[2].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float)));
record[2].attr_data.insert(std::make_pair("field_0", attr_data_0));
record[2].attr_data.insert(std::make_pair("field_1", attr_data_1));
record[2].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t)));
record[2].attr_nbytes.insert(std::make_pair("field_1", sizeof(float)));
ASSERT_EQ(buffer.AppendEntity(record[2]), milvus::WAL_SUCCESS);
new_file_no = uint32_t(record[2].lsn >> 32);
ASSERT_EQ(new_file_no, ++file_no);
// write 3 (new file)
record[3].type = milvus::engine::wal::MXLogType::Entity;
record[3].collection_id = "insert_table";
record[3].partition_tag = "parti1";
record[3].length = 10;
record[3].ids = (milvus::engine::IDNumber*)malloc(record[3].length * sizeof(milvus::engine::IDNumber));
record[3].data_size = record[3].length * sizeof(uint8_t);
record[3].data = malloc(record[3].data_size);
record[3].field_names.resize(2);
record[3].field_names[0] = "field_0";
record[3].field_names[1] = "field_1";
record[3].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t)));
record[3].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float)));
record[3].attr_data.insert(std::make_pair("field_0", attr_data1_0));
record[3].attr_data.insert(std::make_pair("field_1", attr_data1_1));
record[3].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t)));
record[3].attr_nbytes.insert(std::make_pair("field_1", sizeof(float)));
ASSERT_EQ(buffer.AppendEntity(record[3]), milvus::WAL_SUCCESS);
new_file_no = uint32_t(record[3].lsn >> 32);
ASSERT_EQ(new_file_no, ++file_no);
// reset write lsn (record 2)
ASSERT_TRUE(buffer.ResetWriteLsn(record[3].lsn));
ASSERT_TRUE(buffer.ResetWriteLsn(record[2].lsn));
ASSERT_TRUE(buffer.ResetWriteLsn(record[1].lsn));
// write 2 and 3 again
ASSERT_EQ(buffer.AppendEntity(record[2]), milvus::WAL_SUCCESS);
ASSERT_EQ(buffer.AppendEntity(record[3]), milvus::WAL_SUCCESS);
// read 2
ASSERT_EQ(buffer.NextEntity(record[3].lsn, read_rst), milvus::WAL_SUCCESS);
ASSERT_EQ(read_rst.type, record[2].type);
ASSERT_EQ(read_rst.collection_id, record[2].collection_id);
ASSERT_EQ(read_rst.partition_tag, record[2].partition_tag);
ASSERT_EQ(read_rst.length, record[2].length);
ASSERT_EQ(memcmp(read_rst.ids, record[2].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0);
ASSERT_EQ(read_rst.data_size, record[2].data_size);
ASSERT_EQ(memcmp(read_rst.data, record[2].data, read_rst.data_size), 0);
ASSERT_EQ(read_rst.field_names.size(), record[2].field_names.size());
ASSERT_EQ(read_rst.field_names[1], record[2].field_names[1]);
ASSERT_EQ(read_rst.attr_data.at("field_1").size(), record[2].attr_data.at("field_1").size());
ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[2].attr_nbytes.at("field_0"));
// read 3
ASSERT_EQ(buffer.NextEntity(record[3].lsn, read_rst), milvus::WAL_SUCCESS);
ASSERT_EQ(read_rst.type, record[3].type);
ASSERT_EQ(read_rst.collection_id, record[3].collection_id);
ASSERT_EQ(read_rst.partition_tag, record[3].partition_tag);
ASSERT_EQ(read_rst.length, record[3].length);
ASSERT_EQ(memcmp(read_rst.ids, record[3].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0);
ASSERT_EQ(read_rst.data_size, record[3].data_size);
ASSERT_EQ(memcmp(read_rst.data, record[3].data, read_rst.data_size), 0);
ASSERT_EQ(read_rst.field_names.size(), record[3].field_names.size());
ASSERT_EQ(read_rst.field_names[1], record[3].field_names[1]);
ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[3].attr_nbytes.at("field_0"));
// test an empty record
milvus::engine::wal::MXLogRecord empty;
empty.type = milvus::engine::wal::MXLogType::None;
empty.length = 0;
empty.data_size = 0;
ASSERT_EQ(buffer.AppendEntity(empty), milvus::WAL_SUCCESS);
ASSERT_EQ(buffer.NextEntity(empty.lsn, read_rst), milvus::WAL_SUCCESS);
ASSERT_EQ(read_rst.type, milvus::engine::wal::MXLogType::None);
ASSERT_TRUE(read_rst.collection_id.empty());
ASSERT_TRUE(read_rst.partition_tag.empty());
ASSERT_EQ(read_rst.length, 0);
ASSERT_EQ(read_rst.data_size, 0);
// remove old files
buffer.RemoveOldFiles(record[3].lsn);
ASSERT_EQ(buffer.file_no_from_, file_no);
// clear writen lsn and reset failed
buffer.mxlog_buffer_writer_.file_no = 0;
buffer.mxlog_buffer_writer_.buf_offset = 0;
ASSERT_FALSE(buffer.ResetWriteLsn(record[1].lsn));
// clear writen lsn and reset failed
FILE *fi = fopen(WAL_GTEST_PATH "5.wal", "w");
fclose(fi);
buffer.mxlog_buffer_writer_.file_no = 0;
buffer.mxlog_buffer_writer_.buf_offset = 0;
ASSERT_FALSE(buffer.ResetWriteLsn(record[1].lsn));
for (int i = 0; i < 3; i++) {
if (record[i].ids != nullptr) {
free((void*)record[i].ids);
}
if (record[i].data != nullptr) {
free((void*)record[i].data);
}
}
}
TEST(WalTest, MANAGER_INIT_TEST) {
MakeEmptyTestPath();

View File

@ -91,6 +91,7 @@ class RpcHandlerTest : public testing::Test {
milvus::server::Config::GetInstance().SetStorageConfigSecondaryPath("");
milvus::server::Config::GetInstance().SetCacheConfigCacheInsertData("");
milvus::server::Config::GetInstance().SetEngineConfigOmpThreadNum("");
milvus::server::Config::GetInstance().SetServerConfigPort("19531");
// serverConfig.SetValue(server::CONFIG_CLUSTER_MODE, "cluster");
// DBWrapper::GetInstance().GetInstance().StartService();

View File

@ -342,9 +342,6 @@ class TestClient : public oatpp::web::client::ApiClient {
API_CALL("POST", "/hybrid_collections/{collection_name}/entities", InsertEntity,
PATH(String, collection_name), BODY_STRING(String, body))
// API_CALL("POST", "/hybrid_collections/{collection_name}/vectors", HybridSearch,
// PATH(String, collection_name), BODY_STRING(String, body))
#include OATPP_CODEGEN_END(ApiClient)
};