mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
MS-110 - Avoid huge file size
Former-commit-id: 53e865a853cc15b5ffe91159ab158d37ec7cb634
This commit is contained in:
parent
0c05cffa72
commit
fb8bf21611
@ -11,6 +11,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-89 - Fix compile failed, libgpufaiss.a link missing
|
||||
- MS-90 - Fix arch match incorrect on ARM
|
||||
- MS-99 - Fix compilation bug
|
||||
- MS-110 - Avoid huge file size
|
||||
|
||||
## Improvement
|
||||
- MS-82 - Update server startup welcome message
|
||||
|
@ -479,7 +479,7 @@ void DBImpl::StartCompactionTask() {
|
||||
}
|
||||
|
||||
//serialize memory data
|
||||
std::vector<std::string> temp_table_ids;
|
||||
std::set<std::string> temp_table_ids;
|
||||
mem_mgr_->Serialize(temp_table_ids);
|
||||
for(auto& id : temp_table_ids) {
|
||||
compact_table_ids_.insert(id);
|
||||
@ -550,7 +550,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
|
||||
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
|
||||
|
||||
index->Cache();
|
||||
//current disable this line to avoid memory
|
||||
//index->Cache();
|
||||
|
||||
return status;
|
||||
}
|
||||
@ -670,7 +671,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
<< index->PhysicalSize()/(1024*1024) << " M"
|
||||
<< " from file " << to_remove.file_id_;
|
||||
|
||||
index->Cache();
|
||||
//current disable this line to avoid memory
|
||||
//index->Cache();
|
||||
|
||||
} catch (std::exception& ex) {
|
||||
return Status::Error("Build index encounter exception", ex.what());
|
||||
@ -709,7 +711,7 @@ Status DBImpl::Size(uint64_t& result) {
|
||||
DBImpl::~DBImpl() {
|
||||
shutting_down_.store(true, std::memory_order_release);
|
||||
bg_timer_thread_.join();
|
||||
std::vector<std::string> ids;
|
||||
std::set<std::string> ids;
|
||||
mem_mgr_->Serialize(ids);
|
||||
}
|
||||
|
||||
|
@ -20,36 +20,54 @@ namespace engine {
|
||||
|
||||
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
|
||||
const meta::TableFileSchema& schema, const Options& options)
|
||||
: pMeta_(meta_ptr),
|
||||
: meta_(meta_ptr),
|
||||
options_(options),
|
||||
schema_(schema),
|
||||
pIdGenerator_(new SimpleIDGenerator()),
|
||||
pEE_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
|
||||
id_generator_(new SimpleIDGenerator()),
|
||||
active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
|
||||
}
|
||||
|
||||
|
||||
void MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
|
||||
Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
|
||||
if(active_engine_ == nullptr) {
|
||||
return Status::Error("index engine is null");
|
||||
}
|
||||
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
|
||||
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
|
||||
id_generator_->GetNextIDNumbers(n_, vector_ids_);
|
||||
Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(schema_.dimension_), total_time);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
size_t MemVectors::Total() const {
|
||||
return pEE_->Count();
|
||||
size_t MemVectors::RowCount() const {
|
||||
if(active_engine_ == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return active_engine_->Count();
|
||||
}
|
||||
|
||||
size_t MemVectors::ApproximateSize() const {
|
||||
return pEE_->Size();
|
||||
size_t MemVectors::Size() const {
|
||||
if(active_engine_ == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return active_engine_->Size();
|
||||
}
|
||||
|
||||
Status MemVectors::Serialize(std::string& table_id) {
|
||||
if(active_engine_ == nullptr) {
|
||||
return Status::Error("index engine is null");
|
||||
}
|
||||
|
||||
table_id = schema_.table_id_;
|
||||
auto size = ApproximateSize();
|
||||
auto size = Size();
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
pEE_->Serialize();
|
||||
active_engine_->Serialize();
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
schema_.size_ = size;
|
||||
@ -59,20 +77,20 @@ Status MemVectors::Serialize(std::string& table_id) {
|
||||
schema_.file_type_ = (size >= options_.index_trigger_size) ?
|
||||
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
|
||||
|
||||
auto status = pMeta_->UpdateTableFile(schema_);
|
||||
auto status = meta_->UpdateTableFile(schema_);
|
||||
|
||||
LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
|
||||
<< " file " << schema_.file_id_ << " of size " << (double)(pEE_->Size()) / (double)meta::M << " M";
|
||||
<< " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M";
|
||||
|
||||
pEE_->Cache();
|
||||
active_engine_->Cache();
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
MemVectors::~MemVectors() {
|
||||
if (pIdGenerator_ != nullptr) {
|
||||
delete pIdGenerator_;
|
||||
pIdGenerator_ = nullptr;
|
||||
if (id_generator_ != nullptr) {
|
||||
delete id_generator_;
|
||||
id_generator_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,20 +99,20 @@ MemVectors::~MemVectors() {
|
||||
*/
|
||||
MemManager::MemVectorsPtr MemManager::GetMemByTable(
|
||||
const std::string& table_id) {
|
||||
auto memIt = memMap_.find(table_id);
|
||||
if (memIt != memMap_.end()) {
|
||||
auto memIt = mem_id_map_.find(table_id);
|
||||
if (memIt != mem_id_map_.end()) {
|
||||
return memIt->second;
|
||||
}
|
||||
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = table_id;
|
||||
auto status = pMeta_->CreateTableFile(table_file);
|
||||
auto status = meta_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
memMap_[table_id] = MemVectorsPtr(new MemVectors(pMeta_, table_file, options_));
|
||||
return memMap_[table_id];
|
||||
mem_id_map_[table_id] = MemVectorsPtr(new MemVectors(meta_, table_file, options_));
|
||||
return mem_id_map_[table_id];
|
||||
}
|
||||
|
||||
Status MemManager::InsertVectors(const std::string& table_id_,
|
||||
@ -114,37 +132,44 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
|
||||
if (mem == nullptr) {
|
||||
return Status::NotFound("Group " + table_id + " not found!");
|
||||
}
|
||||
mem->Add(n, vectors, vector_ids);
|
||||
|
||||
return Status::OK();
|
||||
//makesure each file size less than index_trigger_size
|
||||
if(mem->Size() > options_.index_trigger_size) {
|
||||
std::unique_lock<std::mutex> lock(serialization_mtx_);
|
||||
immu_mem_list_.push_back(mem);
|
||||
mem_id_map_.erase(table_id);
|
||||
return InsertVectorsNoLock(table_id, n, vectors, vector_ids);
|
||||
} else {
|
||||
return mem->Add(n, vectors, vector_ids);
|
||||
}
|
||||
}
|
||||
|
||||
Status MemManager::ToImmutable() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
for (auto& kv: memMap_) {
|
||||
immMems_.push_back(kv.second);
|
||||
for (auto& kv: mem_id_map_) {
|
||||
immu_mem_list_.push_back(kv.second);
|
||||
}
|
||||
|
||||
memMap_.clear();
|
||||
mem_id_map_.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MemManager::Serialize(std::vector<std::string>& table_ids) {
|
||||
Status MemManager::Serialize(std::set<std::string>& table_ids) {
|
||||
ToImmutable();
|
||||
std::unique_lock<std::mutex> lock(serialization_mtx_);
|
||||
std::string table_id;
|
||||
table_ids.clear();
|
||||
for (auto& mem : immMems_) {
|
||||
for (auto& mem : immu_mem_list_) {
|
||||
mem->Serialize(table_id);
|
||||
table_ids.push_back(table_id);
|
||||
table_ids.insert(table_id);
|
||||
}
|
||||
immMems_.clear();
|
||||
immu_mem_list_.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MemManager::EraseMemVector(const std::string& table_id) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
memMap_.erase(table_id);
|
||||
mem_id_map_.erase(table_id);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <ctime>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
@ -32,11 +33,11 @@ public:
|
||||
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
|
||||
const meta::TableFileSchema&, const Options&);
|
||||
|
||||
void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
|
||||
Status Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
|
||||
|
||||
size_t Total() const;
|
||||
size_t RowCount() const;
|
||||
|
||||
size_t ApproximateSize() const;
|
||||
size_t Size() const;
|
||||
|
||||
Status Serialize(std::string& table_id);
|
||||
|
||||
@ -49,11 +50,11 @@ private:
|
||||
MemVectors(const MemVectors&) = delete;
|
||||
MemVectors& operator=(const MemVectors&) = delete;
|
||||
|
||||
MetaPtr pMeta_;
|
||||
MetaPtr meta_;
|
||||
Options options_;
|
||||
meta::TableFileSchema schema_;
|
||||
IDGenerator* pIdGenerator_;
|
||||
ExecutionEnginePtr pEE_;
|
||||
IDGenerator* id_generator_;
|
||||
ExecutionEnginePtr active_engine_;
|
||||
|
||||
}; // MemVectors
|
||||
|
||||
@ -66,14 +67,14 @@ public:
|
||||
using Ptr = std::shared_ptr<MemManager>;
|
||||
|
||||
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
|
||||
: pMeta_(meta), options_(options) {}
|
||||
: meta_(meta), options_(options) {}
|
||||
|
||||
MemVectorsPtr GetMemByTable(const std::string& table_id);
|
||||
|
||||
Status InsertVectors(const std::string& table_id,
|
||||
size_t n, const float* vectors, IDNumbers& vector_ids);
|
||||
|
||||
Status Serialize(std::vector<std::string>& table_ids);
|
||||
Status Serialize(std::set<std::string>& table_ids);
|
||||
|
||||
Status EraseMemVector(const std::string& table_id);
|
||||
|
||||
@ -82,11 +83,11 @@ private:
|
||||
size_t n, const float* vectors, IDNumbers& vector_ids);
|
||||
Status ToImmutable();
|
||||
|
||||
using MemMap = std::map<std::string, MemVectorsPtr>;
|
||||
using ImmMemPool = std::vector<MemVectorsPtr>;
|
||||
MemMap memMap_;
|
||||
ImmMemPool immMems_;
|
||||
MetaPtr pMeta_;
|
||||
using MemIdMap = std::map<std::string, MemVectorsPtr>;
|
||||
using MemList = std::vector<MemVectorsPtr>;
|
||||
MemIdMap mem_id_map_;
|
||||
MemList immu_mem_list_;
|
||||
MetaPtr meta_;
|
||||
Options options_;
|
||||
std::mutex mutex_;
|
||||
std::mutex serialization_mtx_;
|
||||
|
Loading…
Reference in New Issue
Block a user