Merge branch 'jinhai' of 192.168.1.105:jinhai/vecwise_engine into jinhai

Former-commit-id: ec359e3f7b884e87cd3d629ac7bf722a9b0b3ef5
This commit is contained in:
groot 2019-04-17 12:15:25 +08:00
commit 8d163abd42
9 changed files with 76 additions and 35 deletions

1
.gitignore vendored
View File

@ -9,6 +9,7 @@ cmake_build
*.o
*.lo
*.tar.gz
*.log
cpp/third_party/thrift-0.12.0/
cpp/third_party/faiss-1.5.1

View File

@ -5,6 +5,7 @@
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
#include <faiss/AutoTune.h>
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
@ -113,21 +114,20 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
return status;
}
faiss::IndexFlat innerIndex(group_file.dimension);
faiss::IndexIDMap index(&innerIndex);
std::shared_ptr<faiss::Index> index(faiss::index_factory(group_file.dimension, "IDMap,Flat"));
meta::GroupFilesSchema updated;
for (auto& file : files) {
auto file_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
index.add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
index->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
file_index->id_map.data());
auto file_schema = file;
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
updated.push_back(file_schema);
}
faiss::write_index(&index, group_file.location.c_str());
faiss::write_index(index.get(), group_file.location.c_str());
group_file.file_type = meta::GroupFileSchema::RAW;
updated.push_back(group_file);
status = _pMeta->update_files(updated);
@ -166,14 +166,14 @@ Status DBImpl::background_merge_files(const std::string& group_id) {
Status DBImpl::build_index(const meta::GroupFileSchema& file) {
//PXU TODO
std::cout << ">>Building Index for: " << file.location << std::endl;
return Status::OK();
}
Status DBImpl::background_build_index() {
assert(bg_build_index_started_);
meta::GroupFilesSchema to_index_files;
// PXU TODO
/* _pMeta->files_to_index(to_index_files); */
_pMeta->files_to_index(to_index_files);
Status status;
for (auto& file : to_index_files) {
status = build_index(file);

View File

@ -1,6 +1,9 @@
#include <sys/stat.h>
#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <fstream>
#include "DBMetaImpl.h"
#include "IDGenerator.h"
@ -9,6 +12,13 @@ namespace vecwise {
namespace engine {
namespace meta {
long GetFileSize(const std::string& filename)
{
struct stat stat_buf;
int rc = stat(filename.c_str(), &stat_buf);
return rc == 0 ? stat_buf.st_size : -1;
}
DBMetaImpl::DBMetaImpl(const MetaOptions& options_)
: _options(static_cast<const DBMetaOptions&>(options_)) {
initialize();
@ -57,6 +67,24 @@ Status DBMetaImpl::add_group_file(const std::string& group_id,
return Status::OK();
}
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
// PXU TODO
files.clear();
std::stringstream ss;
ss << "/tmp/test/" << Meta::GetDate();
boost::filesystem::path path(ss.str().c_str());
boost::filesystem::directory_iterator end_itr;
for (boost::filesystem::directory_iterator itr(path); itr != end_itr; ++itr) {
std::cout << itr->path().string() << std::endl;
GroupFileSchema f;
f.location = itr->path().string();
if (1024*1024*50 >= GetFileSize(f.location)) continue;
std::cout << "About to index " << f.location << std::endl;
files.push_back(f);
}
return Status::OK();
}
Status DBMetaImpl::files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) {
//PXU TODO
@ -72,6 +100,8 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
std::cout << itr->path().string() << std::endl;
GroupFileSchema f;
f.location = itr->path().string();
if (1024*1024*50 < GetFileSize(f.location)) continue;
std::cout << "About to merge " << f.location << std::endl;
files[date].push_back(f);
}

View File

@ -41,6 +41,8 @@ public:
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) override;
virtual Status files_to_index(GroupFilesSchema&) override;
private:
Status initialize();

View File

@ -1,10 +1,10 @@
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
#include <faiss/AutoTune.h>
#include <iostream>
#include <sstream>
#include <thread>
#include <wrapper/Index.h>
#include "MemManager.h"
#include "Meta.h"
@ -19,20 +19,19 @@ MemVectors::MemVectors(const std::string& group_id,
_file_location(file_location),
_pIdGenerator(new SimpleIDGenerator()),
_dimension(dimension),
_pInnerIndex(new faiss::IndexFlat(_dimension)),
_pIdMapIndex(new faiss::IndexIDMap(_pInnerIndex)) {
pIndex_(faiss::index_factory(_dimension, "IDMap,Flat")) {
}
void MemVectors::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
_pIdGenerator->getNextIDNumbers(n_, vector_ids_);
_pIdMapIndex->add_with_ids(n_, vectors_, &vector_ids_[0]);
pIndex_->add_with_ids(n_, vectors_, &vector_ids_[0]);
for(auto i=0 ; i<n_; i++) {
vector_ids_.push_back(i);
}
}
size_t MemVectors::total() const {
return _pIdMapIndex->ntotal;
return pIndex_->ntotal;
}
size_t MemVectors::approximate_size() const {
@ -42,10 +41,11 @@ size_t MemVectors::approximate_size() const {
Status MemVectors::serialize(std::string& group_id) {
/* std::stringstream ss; */
/* ss << "/tmp/test/" << _pIdGenerator->getNextIDNumber(); */
/* faiss::write_index(_pIdMapIndex, ss.str().c_str()); */
/* std::cout << _pIdMapIndex->ntotal << std::endl; */
/* faiss::write_index(pIndex_, ss.str().c_str()); */
/* std::cout << pIndex_->ntotal << std::endl; */
/* std::cout << _file_location << std::endl; */
faiss::write_index(_pIdMapIndex, _file_location.c_str());
/* faiss::write_index(pIndex_, _file_location.c_str()); */
write_index(pIndex_, _file_location.c_str());
group_id = group_id_;
return Status::OK();
}
@ -55,13 +55,9 @@ MemVectors::~MemVectors() {
delete _pIdGenerator;
_pIdGenerator = nullptr;
}
if (_pIdMapIndex != nullptr) {
delete _pIdMapIndex;
_pIdMapIndex = nullptr;
}
if (_pInnerIndex != nullptr) {
delete _pInnerIndex;
_pInnerIndex = nullptr;
if (pIndex_ != nullptr) {
delete pIndex_;
pIndex_ = nullptr;
}
}

View File

@ -10,7 +10,6 @@
#include "Status.h"
namespace faiss {
class IndexIDMap;
class Index;
}
@ -50,8 +49,7 @@ private:
const std::string _file_location;
IDGenerator* _pIdGenerator;
size_t _dimension;
faiss::Index* _pInnerIndex;
faiss::IndexIDMap* _pIdMapIndex;
faiss::Index* pIndex_;
}; // MemVectors

View File

@ -77,6 +77,8 @@ public:
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) = 0;
virtual Status files_to_index(GroupFilesSchema&) = 0;
static DateT GetDate(const std::time_t& t);
static DateT GetDate();

View File

@ -21,9 +21,11 @@ IndexBuilder::IndexBuilder(const Operand_ptr &opd) {
opd_ = opd;
}
Index_ptr IndexBuilder::build_all(const long &nb, const vector<float> &xb,
const vector<long> &ids,
const long &nt, const vector<float> &xt) {
Index_ptr IndexBuilder::build_all(const long &nb,
const float* xb,
const long* ids,
const long &nt,
const float* xt) {
std::shared_ptr<faiss::Index> index = nullptr;
index.reset(faiss::index_factory(opd_->d, opd_->index_type.c_str()));
@ -31,14 +33,20 @@ Index_ptr IndexBuilder::build_all(const long &nb, const vector<float> &xb,
// currently only cpu resources are used.
std::lock_guard<std::mutex> lk(cpu_resource);
if (!index->is_trained) {
nt == 0 || xt.empty() ? index->train(nb, xb.data())
: index->train(nt, xt.data());
nt == 0 || xt == nullptr ? index->train(nb, xb)
: index->train(nt, xt);
}
index->add(nb, xb.data());
index->add_with_ids(nb, xb.data(), ids.data()); // todo(linxj): support add_with_idmap
index->add_with_ids(nb, xb, ids); // todo(linxj): support add_with_idmap
}
return std::make_shared<Index>(index);
}
Index_ptr IndexBuilder::build_all(const long &nb, const vector<float> &xb,
const vector<long> &ids,
const long &nt, const vector<float> &xt) {
return build_all(nb, xb.data(), ids.data(), nt, xt.data());
}
// Be Factory pattern later

View File

@ -19,6 +19,12 @@ class IndexBuilder {
public:
explicit IndexBuilder(const Operand_ptr &opd);
Index_ptr build_all(const long &nb,
const float* xb,
const long* ids,
const long &nt = 0,
const float* xt = nullptr);
Index_ptr build_all(const long &nb,
const std::vector<float> &xb,
const std::vector<long> &ids,
@ -47,5 +53,3 @@ extern IndexBuilderPtr GetIndexBuilder(const Operand_ptr &opd);
}
}
}