#1537 add interface open()/close() for IOReader/IOWriter

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
yudong.cai 2020-03-09 15:11:55 +08:00
parent 1d18c0a938
commit ea63f65ceb
12 changed files with 136 additions and 45 deletions

View File

@ -18,9 +18,8 @@ namespace storage {
class IOReader {
public:
explicit IOReader(const std::string& name) : name_(name) {
}
~IOReader() = default;
virtual void
open(const std::string& name) = 0;
virtual void
read(void* ptr, size_t size) = 0;
@ -31,8 +30,8 @@ class IOReader {
virtual size_t
length() = 0;
public:
std::string name_;
virtual void
close() = 0;
};
} // namespace storage

View File

@ -18,9 +18,8 @@ namespace storage {
class IOWriter {
public:
explicit IOWriter(const std::string& name) : name_(name), len_(0) {
}
~IOWriter() = default;
virtual void
open(const std::string& name) = 0;
virtual void
write(void* ptr, size_t size) = 0;
@ -28,9 +27,8 @@ class IOWriter {
virtual size_t
length() = 0;
public:
std::string name_;
size_t len_;
virtual void
close() = 0;
};
} // namespace storage

View File

@ -14,14 +14,12 @@
namespace milvus {
namespace storage {
DiskIOReader::DiskIOReader(const std::string& name) : IOReader(name) {
void
DiskIOReader::open(const std::string& name) {
name_ = name;
fs_ = std::fstream(name_, std::ios::in | std::ios::binary);
}
DiskIOReader::~DiskIOReader() {
fs_.close();
}
void
DiskIOReader::read(void* ptr, size_t size) {
fs_.read(reinterpret_cast<char*>(ptr), size);
@ -35,7 +33,15 @@ DiskIOReader::seekg(size_t pos) {
size_t
DiskIOReader::length() {
fs_.seekg(0, fs_.end);
return fs_.tellg();
size_t len = fs_.tellg();
fs_.seekg(0, fs_.beg);
return len;
}
void
DiskIOReader::close() {
fs_.close();
}
} // namespace storage
} // namespace milvus

View File

@ -20,8 +20,20 @@ namespace storage {
class DiskIOReader : public IOReader {
public:
explicit DiskIOReader(const std::string& name);
~DiskIOReader();
DiskIOReader() = default;
~DiskIOReader() = default;
// No copy and move
DiskIOReader(const DiskIOReader&) = delete;
DiskIOReader(DiskIOReader&&) = delete;
DiskIOReader&
operator=(const DiskIOReader&) = delete;
DiskIOReader&
operator=(DiskIOReader&&) = delete;
void
open(const std::string& name) override;
void
read(void* ptr, size_t size) override;
@ -32,7 +44,11 @@ class DiskIOReader : public IOReader {
size_t
length() override;
void
close() override;
public:
std::string name_;
std::fstream fs_;
};

View File

@ -14,14 +14,13 @@
namespace milvus {
namespace storage {
DiskIOWriter::DiskIOWriter(const std::string& name) : IOWriter(name) {
void
DiskIOWriter::open(const std::string& name) {
name_ = name;
len_ = 0;
fs_ = std::fstream(name_, std::ios::out | std::ios::binary);
}
DiskIOWriter::~DiskIOWriter() {
fs_.close();
}
void
DiskIOWriter::write(void* ptr, size_t size) {
fs_.write(reinterpret_cast<char*>(ptr), size);
@ -33,5 +32,10 @@ DiskIOWriter::length() {
return len_;
}
void
DiskIOWriter::close() {
fs_.close();
}
} // namespace storage
} // namespace milvus

View File

@ -20,8 +20,20 @@ namespace storage {
class DiskIOWriter : public IOWriter {
public:
explicit DiskIOWriter(const std::string& name);
~DiskIOWriter();
DiskIOWriter() = default;
~DiskIOWriter() = default;
// No copy and move
DiskIOWriter(const DiskIOWriter&) = delete;
DiskIOWriter(DiskIOWriter&&) = delete;
DiskIOWriter&
operator=(const DiskIOWriter&) = delete;
DiskIOWriter&
operator=(DiskIOWriter&&) = delete;
void
open(const std::string& name) override;
void
write(void* ptr, size_t size) override;
@ -29,7 +41,12 @@ class DiskIOWriter : public IOWriter {
size_t
length() override;
void
close() override;
public:
std::string name_;
size_t len_;
std::fstream fs_;
};

View File

@ -15,13 +15,13 @@
namespace milvus {
namespace storage {
S3IOReader::S3IOReader(const std::string& name) : IOReader(name), pos_(0) {
void
S3IOReader::open(const std::string& name) {
name_ = name;
pos_ = 0;
S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_);
}
S3IOReader::~S3IOReader() {
}
void
S3IOReader::read(void* ptr, size_t size) {
memcpy(ptr, buffer_.data() + pos_, size);
@ -37,5 +37,9 @@ S3IOReader::length() {
return buffer_.length();
}
void
S3IOReader::close() {
}
} // namespace storage
} // namespace milvus

View File

@ -19,8 +19,20 @@ namespace storage {
class S3IOReader : public IOReader {
public:
explicit S3IOReader(const std::string& name);
~S3IOReader();
S3IOReader() = default;
~S3IOReader() = default;
// No copy and move
S3IOReader(const S3IOReader&) = delete;
S3IOReader(S3IOReader&&) = delete;
S3IOReader&
operator=(const S3IOReader&) = delete;
S3IOReader&
operator=(S3IOReader&&) = delete;
void
open(const std::string& name) override;
void
read(void* ptr, size_t size) override;
@ -31,7 +43,11 @@ class S3IOReader : public IOReader {
size_t
length() override;
void
close() override;
public:
std::string name_;
std::string buffer_;
size_t pos_;
};

View File

@ -15,14 +15,13 @@
namespace milvus {
namespace storage {
S3IOWriter::S3IOWriter(const std::string& name) : IOWriter(name) {
void
S3IOWriter::open(const std::string& name) {
name_ = name;
len_ = 0;
buffer_ = "";
}
S3IOWriter::~S3IOWriter() {
S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_);
}
void
S3IOWriter::write(void* ptr, size_t size) {
buffer_ += std::string(reinterpret_cast<char*>(ptr), size);
@ -34,5 +33,10 @@ S3IOWriter::length() {
return len_;
}
void
S3IOWriter::close() {
S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_);
}
} // namespace storage
} // namespace milvus

View File

@ -19,8 +19,20 @@ namespace storage {
class S3IOWriter : public IOWriter {
public:
explicit S3IOWriter(const std::string& name);
~S3IOWriter();
S3IOWriter() = default;
~S3IOWriter() = default;
// No copy and move
S3IOWriter(const S3IOWriter&) = delete;
S3IOWriter(S3IOWriter&&) = delete;
S3IOWriter&
operator=(const S3IOWriter&) = delete;
S3IOWriter&
operator=(S3IOWriter&&) = delete;
void
open(const std::string& name) override;
void
write(void* ptr, size_t size) override;
@ -28,7 +40,12 @@ class S3IOWriter : public IOWriter {
size_t
length() override;
void
close() override;
public:
std::string name_;
size_t len_;
std::string buffer_;
};

View File

@ -179,12 +179,13 @@ read_index(const std::string& location) {
std::shared_ptr<storage::IOReader> reader_ptr;
if (s3_enable) {
reader_ptr = std::make_shared<storage::S3IOReader>(location);
reader_ptr = std::make_shared<storage::S3IOReader>();
} else {
reader_ptr = std::make_shared<storage::DiskIOReader>(location);
reader_ptr = std::make_shared<storage::DiskIOReader>();
}
recorder.RecordSection("Start");
reader_ptr->open(location);
size_t length = reader_ptr->length();
if (length <= 0) {
@ -226,6 +227,8 @@ read_index(const std::string& location) {
delete[] meta;
}
reader_ptr->close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s";
@ -252,12 +255,13 @@ write_index(VecIndexPtr index, const std::string& location) {
std::shared_ptr<storage::IOWriter> writer_ptr;
if (s3_enable) {
writer_ptr = std::make_shared<storage::S3IOWriter>(location);
writer_ptr = std::make_shared<storage::S3IOWriter>();
} else {
writer_ptr = std::make_shared<storage::DiskIOWriter>(location);
writer_ptr = std::make_shared<storage::DiskIOWriter>();
}
recorder.RecordSection("Start");
writer_ptr->open(location);
writer_ptr->write(&index_type, sizeof(IndexType));
@ -273,6 +277,8 @@ write_index(VecIndexPtr index, const std::string& location) {
writer_ptr->write((void*)binary->data.get(), binary_length);
}
writer_ptr->close();
double span = recorder.RecordSection("End");
double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024;
STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s";

View File

@ -90,15 +90,18 @@ TEST_F(StorageTest, S3_RW_TEST) {
ASSERT_TRUE(storage_inst.StartService().ok());
{
milvus::storage::S3IOWriter writer(index_name);
milvus::storage::S3IOWriter writer;
writer.open(index_name);
size_t len = content.length();
writer.write(&len, sizeof(len));
writer.write((void*)(content.data()), len);
ASSERT_TRUE(len + sizeof(len) == writer.length());
writer.close();
}
{
milvus::storage::S3IOReader reader(index_name);
milvus::storage::S3IOReader reader;
reader.open(index_name);
size_t length = reader.length();
size_t rp = 0;
reader.seekg(rp);
@ -120,6 +123,7 @@ TEST_F(StorageTest, S3_RW_TEST) {
}
ASSERT_TRUE(content == content_out);
reader.close();
}
storage_inst.StopService();