mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
optimize merge process (#2419)
* optimize merge process Signed-off-by: groot <yihua.mo@zilliz.com> * typo Signed-off-by: groot <yihua.mo@zilliz.com> * refine code Signed-off-by: yhmo <yihua.mo@zilliz.com> * drop collecion issue Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
parent
2982004e2b
commit
2d9b358e16
@ -17,14 +17,6 @@
|
|||||||
|
|
||||||
namespace milvus {
|
namespace milvus {
|
||||||
namespace engine {
|
namespace engine {
|
||||||
namespace {
|
|
||||||
struct {
|
|
||||||
bool
|
|
||||||
operator()(meta::SegmentSchema& left, meta::SegmentSchema& right) const {
|
|
||||||
return left.file_size_ > right.file_size_;
|
|
||||||
}
|
|
||||||
} CompareSegment;
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
Status
|
Status
|
||||||
MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
|
MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
|
||||||
@ -54,7 +46,10 @@ MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesG
|
|||||||
}
|
}
|
||||||
|
|
||||||
// arrange files by file size in descending order
|
// arrange files by file size in descending order
|
||||||
std::sort(sort_files.begin(), sort_files.end(), CompareSegment);
|
std::sort(sort_files.begin(), sort_files.end(),
|
||||||
|
[](const meta::SegmentSchema& left, const meta::SegmentSchema& right) {
|
||||||
|
return left.file_size_ > right.file_size_;
|
||||||
|
});
|
||||||
|
|
||||||
// pick files to merge
|
// pick files to merge
|
||||||
int64_t index_file_size = sort_files[0].index_file_size_;
|
int64_t index_file_size = sort_files[0].index_file_size_;
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
#include "db/meta/MetaConsts.h"
|
#include "db/meta/MetaConsts.h"
|
||||||
#include "utils/Log.h"
|
#include "utils/Log.h"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -34,13 +35,39 @@ MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGr
|
|||||||
{1UL << 30, meta::SegmentsSchema()}, // 1GB
|
{1UL << 30, meta::SegmentsSchema()}, // 1GB
|
||||||
};
|
};
|
||||||
|
|
||||||
meta::SegmentsSchema& files = files_holder.HoldFiles();
|
meta::SegmentsSchema sort_files = files_holder.HoldFiles();
|
||||||
|
// no need to merge single file
|
||||||
|
if (sort_files.size() < 2) {
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
// arrange files by file size in descending order
|
||||||
|
std::sort(sort_files.begin(), sort_files.end(),
|
||||||
|
[](const meta::SegmentSchema& left, const meta::SegmentSchema& right) {
|
||||||
|
return left.file_size_ > right.file_size_;
|
||||||
|
});
|
||||||
|
|
||||||
|
// priority pick files that merge size greater than index_file_size
|
||||||
|
// to avoid big files such as index_file_size = 1024, merged file size = 1280
|
||||||
|
int64_t index_file_size = sort_files[0].index_file_size_;
|
||||||
|
size_t biggest_size = sort_files[0].file_size_;
|
||||||
|
for (auto iter = sort_files.end() - 1; iter != sort_files.begin() + 1; --iter) {
|
||||||
|
if ((*iter).file_size_ + biggest_size > index_file_size) {
|
||||||
|
meta::SegmentsSchema temp_group = {*sort_files.begin(), *iter};
|
||||||
|
files_groups.emplace_back(temp_group);
|
||||||
|
sort_files.erase(iter);
|
||||||
|
sort_files.erase(sort_files.begin());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
meta::SegmentsSchema huge_files;
|
meta::SegmentsSchema huge_files;
|
||||||
// iterater from end, because typically the files_holder get files in order from largest to smallest
|
// put files to layers
|
||||||
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
|
for (meta::SegmentsSchema::reverse_iterator iter = sort_files.rbegin(); iter != sort_files.rend(); ++iter) {
|
||||||
meta::SegmentSchema& file = *iter;
|
meta::SegmentSchema& file = *iter;
|
||||||
if (file.index_file_size_ > 0 && file.file_size_ > (size_t)(file.index_file_size_)) {
|
if (file.index_file_size_ > 0 && file.file_size_ > (size_t)(file.index_file_size_)) {
|
||||||
// file that no need to merge
|
// file that no need to merge
|
||||||
|
files_holder.UnmarkFile(file);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,6 +76,9 @@ DropCollectionRequest::OnExecute() {
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// step 4: flush to trigger CleanUpFilesWithTTL
|
||||||
|
status = DBWrapper::DB()->Flush();
|
||||||
|
|
||||||
rc.ElapseFromBegin("total cost");
|
rc.ElapseFromBegin("total cost");
|
||||||
} catch (std::exception& ex) {
|
} catch (std::exception& ex) {
|
||||||
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
|
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
|
||||||
|
Loading…
Reference in New Issue
Block a user