From fc8a6e0779a2c213834fd8edc9fe73ba2c797b7d Mon Sep 17 00:00:00 2001 From: Zhiru Zhu Date: Wed, 4 Mar 2020 21:10:11 +0800 Subject: [PATCH] Fix #1491 and #1504 (#1506) * update Signed-off-by: Zhiru Zhu * update Signed-off-by: Zhiru Zhu * update Signed-off-by: Zhiru Zhu * fix centos compile error Signed-off-by: Zhiru Zhu --- CHANGELOG.md | 2 + .../default/DefaultDeletedDocsFormat.cpp | 70 ++++++++++++++++--- .../codecs/default/DefaultVectorsFormat.cpp | 54 +++++++++++--- core/src/segment/SegmentReader.cpp | 21 +++--- core/src/segment/SegmentWriter.cpp | 31 ++++---- core/unittest/db/test_delete.cpp | 68 +++++++++++++++++- core/unittest/db/utils.h | 3 + 7 files changed, 199 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d598b46fce..469a69c059 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ Please mark all change in change log and use the issue from GitHub - \#1359 Negative distance value returned when searching with HNSW index type - \#1429 Server crashed when searching vectors using GPU - \#1484 Index type changed to IDMAP after compacted +- \#1491 Server crashed during adding vectors +- \#1504 Avoid possible race condition between delete and search ## Feature - \#216 Add CLI to get server info diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp index 774d8ea96e..fb211f6969 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp @@ -20,7 +20,9 @@ #include #include +#define BOOST_NO_CXX11_SCOPED_ENUMS #include +#undef BOOST_NO_CXX11_SCOPED_ENUMS #include #include #include @@ -46,12 +48,18 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } - auto file_size = boost::filesystem::file_size(boost::filesystem::path(del_file_path)); - auto deleted_docs_size = file_size / sizeof(segment::offset_t); + size_t num_bytes; + if (::read(del_fd, &num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + auto deleted_docs_size = num_bytes / sizeof(segment::offset_t); std::vector deleted_docs_list; deleted_docs_list.resize(deleted_docs_size); - if (::read(del_fd, deleted_docs_list.data(), file_size) == -1) { + if (::read(del_fd, deleted_docs_list.data(), num_bytes) == -1) { std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); @@ -73,27 +81,69 @@ DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const std::string dir_path = directory_ptr->GetDirPath(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; - // TODO(zhiru): append mode - int del_fd = open(del_file_path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 00664); + // Create a temporary file from the existing file + const std::string temp_path = dir_path + "/" + "temp_del"; + bool exists = boost::filesystem::exists(del_file_path); + if (exists) { + boost::filesystem::copy_file(del_file_path, temp_path, boost::filesystem::copy_option::fail_if_exists); + } + + // Write to the temp file, in order to avoid possible race condition with search (concurrent read and write) + int del_fd = open(temp_path.c_str(), O_RDWR | O_CREAT, 00664); if (del_fd == -1) { - std::string err_msg = "Failed to open file: " + del_file_path; + std::string err_msg = "Failed to open file: " + temp_path; ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } - auto deleted_docs_list = deleted_docs->GetDeletedDocs(); + size_t old_num_bytes; + if (exists) { + if (::read(del_fd, &old_num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + temp_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + } else { + old_num_bytes = 0; + } - if (::write(del_fd, deleted_docs_list.data(), sizeof(segment::offset_t) * deleted_docs->GetSize()) == -1) { - std::string err_msg = "Failed to write to file" + del_file_path + ", error: " + std::strerror(errno); + auto deleted_docs_list = deleted_docs->GetDeletedDocs(); + size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetSize(); + + // rewind and overwrite with the new_num_bytes + int off = lseek(del_fd, 0, SEEK_SET); + if (off == -1) { + std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + if (::write(del_fd, &new_num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + // Move to the end of file and append + off = lseek(del_fd, 0, SEEK_END); + if (off == -1) { + std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + if (::write(del_fd, deleted_docs_list.data(), new_num_bytes) == -1) { + std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); } if (::close(del_fd) == -1) { - std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno); + std::string err_msg = "Failed to close file: " + temp_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); } + + // Move temp file to delete file + boost::filesystem::rename(temp_path, del_file_path); } } // namespace codec diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index f0388e4d4d..61b72e8795 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -53,7 +53,14 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } - size_t num_bytes = boost::filesystem::file_size(path); + + size_t num_bytes; + if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + std::vector vector_list; vector_list.resize(num_bytes); if (::read(rv_fd, vector_list.data(), num_bytes) == -1) { @@ -78,11 +85,18 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } - auto file_size = boost::filesystem::file_size(path); - auto count = file_size / sizeof(segment::doc_id_t); + + size_t num_bytes; + if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + auto count = num_bytes / sizeof(segment::doc_id_t); std::vector uids; uids.resize(count); - if (::read(uid_fd, uids.data(), file_size) == -1) { + if (::read(uid_fd, uids.data(), num_bytes) == -1) { std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); @@ -146,8 +160,14 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm auto start = std::chrono::high_resolution_clock::now(); - if (::write(rv_fd, vectors->GetData().data(), vectors->GetData().size()) == -1) { - std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno); + size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t); + if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + if (::write(rv_fd, vectors->GetData().data(), rv_num_bytes) == -1) { + std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); } @@ -162,7 +182,14 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm ENGINE_LOG_DEBUG << "Writing raw vectors took " << diff.count() << " s"; start = std::chrono::high_resolution_clock::now(); - if (::write(uid_fd, vectors->GetUids().data(), sizeof(segment::doc_id_t) * vectors->GetCount()) == -1) { + + size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t); + if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + if (::write(uid_fd, vectors->GetUids().data(), uid_num_bytes) == -1) { std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); @@ -202,10 +229,15 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } - auto file_size = boost::filesystem::file_size(path); - auto count = file_size / sizeof(segment::doc_id_t); + size_t num_bytes; + if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + auto count = num_bytes / sizeof(segment::doc_id_t); uids.resize(count); - if (::read(uid_fd, uids.data(), file_size) == -1) { + if (::read(uid_fd, uids.data(), num_bytes) == -1) { std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); @@ -245,6 +277,8 @@ DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } + + offset += sizeof(size_t); // Beginning of file is num_bytes int off = lseek(rv_fd, offset, SEEK_SET); if (off == -1) { std::string err_msg = "Failed to seek file: " + path.string() + ", error: " + std::strerror(errno); diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 143cc4abc1..f7e7c1af3b 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -22,7 +22,6 @@ #include "Vectors.h" #include "codecs/default/DefaultCodec.h" #include "store/Directory.h" -#include "utils/Exception.h" #include "utils/Log.h" namespace milvus { @@ -47,8 +46,8 @@ SegmentReader::Load() { directory_ptr_->Create(); default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); - } catch (Exception& e) { - return Status(e.code(), e.what()); + } catch (std::exception& e) { + return Status(SERVER_WRITE_ERROR, e.what()); } return Status::OK(); } @@ -59,10 +58,10 @@ SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector& try { directory_ptr_->Create(); default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); - } catch (Exception& e) { + } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors. " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -73,10 +72,10 @@ SegmentReader::LoadUids(std::vector& uids) { try { directory_ptr_->Create(); default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); - } catch (Exception& e) { + } catch (std::exception& e) { std::string err_msg = "Failed to load uids. " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -93,10 +92,10 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { try { directory_ptr_->Create(); default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); - } catch (Exception& e) { + } catch (std::exception& e) { std::string err_msg = "Failed to load bloom filter. " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -107,10 +106,10 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { try { directory_ptr_->Create(); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); - } catch (Exception& e) { + } catch (std::exception& e) { std::string err_msg = "Failed to load deleted docs. " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index fbb6cae707..1e8c509eeb 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -24,7 +24,6 @@ #include "Vectors.h" #include "codecs/default/DefaultCodec.h" #include "store/Directory.h" -#include "utils/Exception.h" #include "utils/Log.h" namespace milvus { @@ -87,10 +86,10 @@ SegmentWriter::WriteVectors() { try { directory_ptr_->Create(); default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_); - } catch (Exception& e) { - std::string err_msg = "Failed to write vectors. " + std::string(e.what()); + } catch (std::exception& e) { + std::string err_msg = "Failed to write vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -127,10 +126,10 @@ SegmentWriter::WriteBloomFilter() { end = std::chrono::high_resolution_clock::now(); diff = end - start; ENGINE_LOG_DEBUG << "Writing bloom filter took " << diff.count() << " s"; - } catch (Exception& e) { - std::string err_msg = "Failed to write vectors. " + std::string(e.what()); + } catch (std::exception& e) { + std::string err_msg = "Failed to write vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -142,10 +141,10 @@ SegmentWriter::WriteDeletedDocs() { directory_ptr_->Create(); DeletedDocsPtr deleted_docs_ptr = std::make_shared(); default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr); - } catch (Exception& e) { - std::string err_msg = "Failed to write deleted docs. " + std::string(e.what()); + } catch (std::exception& e) { + std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -156,10 +155,10 @@ SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { try { directory_ptr_->Create(); default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs); - } catch (Exception& e) { - std::string err_msg = "Failed to write deleted docs. " + std::string(e.what()); + } catch (std::exception& e) { + std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } @@ -170,10 +169,10 @@ SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { try { directory_ptr_->Create(); default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr); - } catch (Exception& e) { - std::string err_msg = "Failed to write bloom filter. " + std::string(e.what()); + } catch (std::exception& e) { + std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(e.code(), err_msg); + return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); } diff --git a/core/unittest/db/test_delete.cpp b/core/unittest/db/test_delete.cpp index 09675f4b4a..806a82e936 100644 --- a/core/unittest/db/test_delete.cpp +++ b/core/unittest/db/test_delete.cpp @@ -199,6 +199,68 @@ TEST_F(DeleteTest, delete_on_disk) { } } +TEST_F(DeleteTest, delete_multiple_times) { + milvus::engine::meta::TableSchema table_info = BuildTableSchema(); + auto stat = db_->CreateTable(table_info); + + milvus::engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = GetTableName(); + stat = db_->DescribeTable(table_info_get); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + int64_t nb = 100000; + milvus::engine::VectorsData xb; + BuildVectors(nb, xb); + + for (int64_t i = 0; i < nb; i++) { + xb.id_array_.push_back(i); + } + + stat = db_->InsertVectors(GetTableName(), "", xb); + ASSERT_TRUE(stat.ok()); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dis(0, nb - 1); + + int64_t num_query = 10; + std::map search_vectors; + for (int64_t i = 0; i < num_query; ++i) { + int64_t index = dis(gen); + milvus::engine::VectorsData search; + search.vector_count_ = 1; + for (int64_t j = 0; j < TABLE_DIM; j++) { + search.float_data_.push_back(xb.float_data_[index * TABLE_DIM + j]); + } + search_vectors.insert(std::make_pair(xb.id_array_[index], search)); + } + + // std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk + stat = db_->Flush(); + ASSERT_TRUE(stat.ok()); + + int topk = 10, nprobe = 10; + for (auto& pair : search_vectors) { + std::vector to_delete{pair.first}; + stat = db_->DeleteVectors(GetTableName(), to_delete); + ASSERT_TRUE(stat.ok()); + + stat = db_->Flush(); + ASSERT_TRUE(stat.ok()); + + auto& search = pair.second; + + std::vector tags; + milvus::engine::ResultIds result_ids; + milvus::engine::ResultDistances result_distances; + stat = db_->Query(dummy_context_, GetTableName(), tags, topk, nprobe, search, result_ids, result_distances); + ASSERT_NE(result_ids[0], pair.first); + // ASSERT_LT(result_distances[0], 1e-4); + ASSERT_GT(result_distances[0], 1); + } +} + TEST_F(DeleteTest, delete_with_index) { milvus::engine::meta::TableSchema table_info = BuildTableSchema(); table_info.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT; @@ -456,7 +518,7 @@ TEST_F(DeleteTest, delete_add_auto_flush) { ASSERT_EQ(result_distances[0], std::numeric_limits::max()); } -TEST_F(DeleteTest, compact_basic) { +TEST_F(CompactTest, compact_basic) { milvus::engine::meta::TableSchema table_info = BuildTableSchema(); auto stat = db_->CreateTable(table_info); @@ -507,7 +569,7 @@ TEST_F(DeleteTest, compact_basic) { } } -TEST_F(DeleteTest, compact_with_index) { +TEST_F(CompactTest, compact_with_index) { milvus::engine::meta::TableSchema table_info = BuildTableSchema(); table_info.index_file_size_ = milvus::engine::ONE_KB; table_info.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFSQ8; @@ -591,7 +653,7 @@ TEST_F(DeleteTest, compact_with_index) { } } -TEST_F(DeleteTest, compact_non_existing_table) { +TEST_F(CompactTest, compact_non_existing_table) { auto status = db_->Compact("non_existing_table"); ASSERT_FALSE(status.ok()); } diff --git a/core/unittest/db/utils.h b/core/unittest/db/utils.h index 5b54d0c7cf..8c4abed4fe 100644 --- a/core/unittest/db/utils.h +++ b/core/unittest/db/utils.h @@ -148,6 +148,9 @@ class MemManagerTest2 : public DBTest {}; ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DeleteTest : public DBTest {}; +///////////////////////////////////////////////////////////////////////////////////////////////////////////////// +class CompactTest : public DBTest {}; + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// class SearchByIdTest : public DBTest {};