Optimize large memory usage of Variant used in InsertRecord (#19197)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2022-09-16 10:52:47 +08:00 committed by GitHub
parent 5cdd6ca251
commit 58d3a49b62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 29 deletions

View File

@ -72,9 +72,6 @@ using VectorArray = proto::schema::VectorField;
using IdArray = proto::schema::IDs; using IdArray = proto::schema::IDs;
using InsertData = proto::segcore::InsertRecord; using InsertData = proto::segcore::InsertRecord;
using PkType = std::variant<std::monostate, int64_t, std::string>; using PkType = std::variant<std::monostate, int64_t, std::string>;
// tbb::concurrent_unordered_multimap equal_range too slow when multi repeated key
// using Pk2OffsetType = tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>>;
using Pk2OffsetType = std::unordered_map<PkType, std::vector<int64_t>, std::hash<PkType>>;
inline bool inline bool
IsPrimaryKeyDataType(DataType data_type) { IsPrimaryKeyDataType(DataType data_type) {

View File

@ -9,16 +9,30 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License // or implied. See the License for the specific language governing permissions and limitations under the License
#include <sys/timeb.h>
#include "InsertRecord.h" #include "InsertRecord.h"
namespace milvus::segcore { namespace milvus::segcore {
InsertRecord::InsertRecord(const Schema& schema, int64_t size_per_chunk) InsertRecord::InsertRecord(const Schema& schema, int64_t size_per_chunk)
: row_ids_(size_per_chunk), timestamps_(size_per_chunk) { : row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
for (auto& field : schema) { for (auto& field : schema) {
auto field_id = field.first; auto field_id = field.first;
auto& field_meta = field.second; auto& field_meta = field.second;
if (pk2offset_ == nullptr && pk_field_id.has_value() && pk_field_id.value() == field_id) {
switch (field_meta.get_data_type()) {
case DataType::INT64: {
pk2offset_ = std::make_unique<OffsetHashMap<int64_t>>();
break;
}
case DataType::VARCHAR: {
pk2offset_ = std::make_unique<OffsetHashMap<std::string>>();
break;
}
}
}
if (field_meta.is_vector()) { if (field_meta.is_vector()) {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
this->append_field_data<FloatVector>(field_id, field_meta.get_dim(), size_per_chunk); this->append_field_data<FloatVector>(field_id, field_meta.get_dim(), size_per_chunk);

View File

@ -24,6 +24,68 @@
namespace milvus::segcore { namespace milvus::segcore {
class OffsetMap {
public:
virtual ~OffsetMap() = default;
virtual std::vector<SegOffset>
find_with_timestamp(const PkType pk, Timestamp timestamp, const ConcurrentVector<Timestamp>& timestamps) const = 0;
virtual std::vector<SegOffset>
find_with_barrier(const PkType pk, int64_t barrier) const = 0;
virtual void
insert(const PkType pk, int64_t offset) = 0;
virtual bool
empty() const = 0;
};
template <typename T>
class OffsetHashMap : public OffsetMap {
public:
std::vector<SegOffset>
find_with_timestamp(const PkType pk, Timestamp timestamp, const ConcurrentVector<Timestamp>& timestamps) const {
std::vector<SegOffset> res_offsets;
auto offset_iter = map_.find(std::get<T>(pk));
if (offset_iter != map_.end()) {
for (auto offset : offset_iter->second) {
if (timestamps[offset] <= timestamp) {
res_offsets.push_back(SegOffset(offset));
}
}
}
return res_offsets;
}
std::vector<SegOffset>
find_with_barrier(const PkType pk, int64_t barrier) const {
std::vector<SegOffset> res_offsets;
auto offset_iter = map_.find(std::get<T>(pk));
if (offset_iter != map_.end()) {
for (auto offset : offset_iter->second) {
if (offset <= barrier) {
res_offsets.push_back(SegOffset(offset));
}
}
}
return res_offsets;
}
void
insert(const PkType pk, int64_t offset) {
map_[std::get<T>(pk)].emplace_back(offset);
}
bool
empty() const {
return map_.empty();
}
private:
std::unordered_map<T, std::vector<int64_t>> map_;
};
struct InsertRecord { struct InsertRecord {
ConcurrentVector<Timestamp> timestamps_; ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<idx_t> row_ids_; ConcurrentVector<idx_t> row_ids_;
@ -36,52 +98,32 @@ struct InsertRecord {
TimestampIndex timestamp_index_; TimestampIndex timestamp_index_;
// pks to row offset // pks to row offset
Pk2OffsetType pk2offset_; std::unique_ptr<OffsetMap> pk2offset_;
explicit InsertRecord(const Schema& schema, int64_t size_per_chunk); explicit InsertRecord(const Schema& schema, int64_t size_per_chunk);
std::vector<SegOffset> std::vector<SegOffset>
search_pk(const PkType pk, Timestamp timestamp) const { search_pk(const PkType pk, Timestamp timestamp) const {
std::shared_lock lck(shared_mutex_); std::shared_lock lck(shared_mutex_);
std::vector<SegOffset> res_offsets; return pk2offset_->find_with_timestamp(pk, timestamp, timestamps_);
auto offset_iter = pk2offset_.find(pk);
if (offset_iter != pk2offset_.end()) {
for (auto offset : offset_iter->second) {
if (timestamps_[offset] <= timestamp) {
res_offsets.push_back(SegOffset(offset));
}
}
}
return res_offsets;
} }
std::vector<SegOffset> std::vector<SegOffset>
search_pk(const PkType pk, int64_t insert_barrier) const { search_pk(const PkType pk, int64_t insert_barrier) const {
std::shared_lock lck(shared_mutex_); std::shared_lock lck(shared_mutex_);
std::vector<SegOffset> res_offsets; return pk2offset_->find_with_barrier(pk, insert_barrier);
auto offset_iter = pk2offset_.find(pk);
if (offset_iter != pk2offset_.end()) {
for (auto offset : offset_iter->second) {
if (offset < insert_barrier) {
res_offsets.push_back(SegOffset(offset));
}
}
}
return res_offsets;
} }
void void
insert_pk(const PkType pk, int64_t offset) { insert_pk(const PkType pk, int64_t offset) {
std::lock_guard lck(shared_mutex_); std::lock_guard lck(shared_mutex_);
pk2offset_[pk].emplace_back(offset); pk2offset_->insert(pk, offset);
} }
bool bool
empty_pks() const { empty_pks() const {
std::shared_lock lck(shared_mutex_); std::shared_lock lck(shared_mutex_);
return pk2offset_.empty(); return pk2offset_->empty();
} }
// get field data without knowing the type // get field data without knowing the type

View File

@ -79,3 +79,37 @@ TEST(SegmentCoreTest, SmallIndex) {
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
schema->AddDebugField("age", DataType::INT32); schema->AddDebugField("age", DataType::INT32);
} }
TEST(OffsetMap, int64_t){
using namespace milvus::segcore;
OffsetMap *map= new OffsetHashMap<int64_t>();
map->insert(PkType(int64_t(10)), 3);
std::vector<SegOffset> offset = map->find_with_barrier(PkType(int64_t(10)),10);
ASSERT_EQ(offset[0].get(), int64_t(3));
}
TEST(InsertRecordTest, int64_t){
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto record = milvus::segcore::InsertRecord(*schema, int64_t(32));
record.insert_pk(PkType(int64_t(12)), int64_t(3));
std::vector<SegOffset> offset = record.search_pk(PkType(int64_t(12)), int64_t(10));
ASSERT_EQ(offset[0].get(), int64_t(3));
}
TEST(InsertRecordTest, string){
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("name", DataType::VARCHAR);
schema->set_primary_field_id(i64_fid);
auto record = milvus::segcore::InsertRecord(*schema, int64_t(32));
record.insert_pk(PkType(std::string("test")), int64_t(3));
std::vector<SegOffset> offset = record.search_pk(PkType(std::string("test")), int64_t(10));
ASSERT_EQ(offset[0].get(), int64_t(3));
}