fix: centroids file not removed when data skew in major compaction (#34359)

issue: https://github.com/milvus-io/milvus/issues/30633
pr: #34050

Signed-off-by: chasingegg <chao.gao@zilliz.com>
This commit is contained in:
Gao 2024-07-05 10:42:10 +08:00 committed by GitHub
parent 74da97796b
commit 261b61e875
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 38 additions and 15 deletions

View File

@ -326,6 +326,8 @@ KmeansClustering::StreamingAssignandUpload(
if (IsDataSkew<T>(config, dim, num_vectors_each_centroid)) {
LOG_INFO(msg_header_ + "data skew! skip clustering");
// remove uploaded files
remote_paths_to_size[cluster_result_.centroid_path] =
cluster_result_.centroid_file_size;
RemoveClusteringResultFiles(file_manager_->GetChunkManager().get(),
remote_paths_to_size);
// skip clustering, nothing takes affect

View File

@ -78,10 +78,35 @@ transforConfigToPB(const Config& config) {
return analyze_info;
}
// when we skip clustering, nothing uploaded
template <typename T>
void
CheckResultEmpty(const milvus::clustering::KmeansClusteringPtr& clusteringJob,
const milvus::storage::ChunkManagerPtr cm,
int64_t segment_id,
int64_t segment_id2) {
std::string centroids_path_prefix =
clusteringJob->GetRemoteCentroidsObjectPrefix();
std::string centroid_path =
centroids_path_prefix + "/" + std::string(CENTROIDS_NAME);
ASSERT_FALSE(cm->Exist(centroid_path));
std::string offset_mapping_name = std::string(OFFSET_MAPPING_NAME);
std::string centroid_id_mapping_path =
clusteringJob->GetRemoteCentroidIdMappingObjectPrefix(segment_id) +
"/" + offset_mapping_name;
milvus::proto::clustering::ClusteringCentroidIdMappingStats mapping_stats;
std::string centroid_id_mapping_path2 =
clusteringJob->GetRemoteCentroidIdMappingObjectPrefix(segment_id2) +
"/" + offset_mapping_name;
ASSERT_FALSE(cm->Exist(centroid_id_mapping_path));
ASSERT_FALSE(cm->Exist(centroid_id_mapping_path2));
}
template <typename T>
void
CheckResultCorrectness(
const milvus::clustering::KmeansClusteringPtr& clusteringJob,
const milvus::storage::ChunkManagerPtr cm,
int64_t segment_id,
int64_t segment_id2,
int64_t dim,
@ -137,6 +162,10 @@ CheckResultCorrectness(
ASSERT_EQ(mapping_stats2.num_in_centroid(i), num_in_centroid[i]);
}
}
// remove files
cm->Remove(centroid_path);
cm->Remove(centroid_id_mapping_path);
cm->Remove(centroid_id_mapping_path2);
}
template <typename T, DataType dtype>
@ -196,7 +225,7 @@ test_run() {
Config config;
config["max_cluster_ratio"] = 10.0;
config["max_cluster_size"] = 5L * 1024 * 1024 * 1024;
auto clusteringJob = std::make_unique<clustering::KmeansClustering>(ctx);
// no need to sample train data
{
config["min_cluster_ratio"] = 0.01;
@ -205,10 +234,9 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
@ -223,10 +251,9 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
@ -244,11 +271,10 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
} catch (SegcoreError& e) {
ASSERT_EQ(e.get_error_code(), ErrorCode::ClusterSkip);
CheckResultEmpty<T>(clusteringJob, cm, segment_id, segment_id2);
throw e;
},
SegcoreError);
@ -264,11 +290,10 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
} catch (SegcoreError& e) {
ASSERT_EQ(e.get_error_code(), ErrorCode::ClusterSkip);
CheckResultEmpty<T>(clusteringJob, cm, segment_id, segment_id2);
throw e;
},
SegcoreError);
@ -282,11 +307,9 @@ test_run() {
config["train_size"] = 1536L * 1024; // 1.5MB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
@ -302,11 +325,9 @@ test_run() {
config["train_size"] = 6L * 1024 * 1024; // 6MB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,