mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Add count row for proxy
Signed-off-by: shengjh <1572099106@qq.com>
This commit is contained in:
parent
bd696486c9
commit
7812eed091
@ -16,7 +16,7 @@ master:
|
|||||||
etcd:
|
etcd:
|
||||||
address: localhost
|
address: localhost
|
||||||
port: 2379
|
port: 2379
|
||||||
rootpath: by-dev/
|
rootpath: by-dev
|
||||||
segthreshold: 10000
|
segthreshold: 10000
|
||||||
|
|
||||||
timesync:
|
timesync:
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include "nlohmann/json.hpp"
|
#include "nlohmann/json.hpp"
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <google/protobuf/text_format.h>
|
#include <google/protobuf/text_format.h>
|
||||||
|
#include <boost/filesystem.hpp>
|
||||||
|
|
||||||
using Collection = masterpb::Collection;
|
using Collection = masterpb::Collection;
|
||||||
using Schema = milvus::grpc::Schema;
|
using Schema = milvus::grpc::Schema;
|
||||||
@ -23,6 +24,7 @@ void ParseSegmentInfo(const std::string &json_str, SegmentInfo &segment_info) {
|
|||||||
segment_info.set_close_timestamp(json["close_timestamp"].get<uint64_t>());
|
segment_info.set_close_timestamp(json["close_timestamp"].get<uint64_t>());
|
||||||
segment_info.set_collection_id(json["collection_id"].get<uint64_t>());
|
segment_info.set_collection_id(json["collection_id"].get<uint64_t>());
|
||||||
segment_info.set_collection_name(json["collection_name"].get<std::string>());
|
segment_info.set_collection_name(json["collection_name"].get<std::string>());
|
||||||
|
segment_info.set_rows(json["rows"].get<std::int64_t>());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ParseCollectionSchema(const std::string &json_str, Collection &collection) {
|
void ParseCollectionSchema(const std::string &json_str, Collection &collection) {
|
||||||
@ -51,8 +53,8 @@ MetaWrapper &MetaWrapper::GetInstance() {
|
|||||||
Status MetaWrapper::Init() {
|
Status MetaWrapper::Init() {
|
||||||
try {
|
try {
|
||||||
etcd_root_path_ = config.etcd.rootpath();
|
etcd_root_path_ = config.etcd.rootpath();
|
||||||
segment_path_ = etcd_root_path_ + "segment/";
|
segment_path_ = (boost::filesystem::path(etcd_root_path_) / "segment/").string();
|
||||||
collection_path_ = etcd_root_path_ + "collection/";
|
collection_path_ = (boost::filesystem::path(etcd_root_path_) / "collection/").string();
|
||||||
|
|
||||||
auto master_addr = config.master.address() + ":" + std::to_string(config.master.port());
|
auto master_addr = config.master.address() + ":" + std::to_string(config.master.port());
|
||||||
master_client_ = std::make_shared<milvus::master::GrpcClient>(master_addr);
|
master_client_ = std::make_shared<milvus::master::GrpcClient>(master_addr);
|
||||||
@ -65,7 +67,6 @@ Status MetaWrapper::Init() {
|
|||||||
UpdateMeta(res);
|
UpdateMeta(res);
|
||||||
};
|
};
|
||||||
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
|
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
|
||||||
|
|
||||||
SyncMeta();
|
SyncMeta();
|
||||||
}
|
}
|
||||||
catch (const std::exception &e) {
|
catch (const std::exception &e) {
|
||||||
@ -162,5 +163,16 @@ Status MetaWrapper::SyncMeta() {
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t MetaWrapper::CountCollection(const std::string &collection_name) {
|
||||||
|
uint64_t count = 0;
|
||||||
|
// TODO: index to speed up
|
||||||
|
for (const auto& segment_info : segment_infos_){
|
||||||
|
if (segment_info.second.collection_name() == collection_name){
|
||||||
|
count += segment_info.second.rows();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -31,6 +31,9 @@ class MetaWrapper {
|
|||||||
Status
|
Status
|
||||||
SyncMeta();
|
SyncMeta();
|
||||||
|
|
||||||
|
int64_t
|
||||||
|
CountCollection(const std::string& collection_name);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool IsCollectionMetaKey(const std::string &key);
|
bool IsCollectionMetaKey(const std::string &key);
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
#include "server/ValidationUtil.h"
|
#include "server/ValidationUtil.h"
|
||||||
#include "utils/Log.h"
|
#include "utils/Log.h"
|
||||||
#include "utils/TimeRecorder.h"
|
#include "utils/TimeRecorder.h"
|
||||||
|
#include "server/MetaWrapper.h"
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
@ -36,7 +37,7 @@ CountEntitiesReq::OnExecute() {
|
|||||||
try {
|
try {
|
||||||
std::string hdr = "CountEntitiesReq(collection=" + collection_name_ + ")";
|
std::string hdr = "CountEntitiesReq(collection=" + collection_name_ + ")";
|
||||||
TimeRecorderAuto rc(hdr);
|
TimeRecorderAuto rc(hdr);
|
||||||
|
row_count_ = MetaWrapper::GetInstance().CountCollection(collection_name_);
|
||||||
|
|
||||||
rc.ElapseFromBegin("done");
|
rc.ElapseFromBegin("done");
|
||||||
} catch (std::exception& ex) {
|
} catch (std::exception& ex) {
|
||||||
|
29
sdk/examples/simple/CountCollection.cpp
Normal file
29
sdk/examples/simple/CountCollection.cpp
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
#include <Status.h>
|
||||||
|
#include <Field.h>
|
||||||
|
#include <MilvusApi.h>
|
||||||
|
#include <interface/ConnectionImpl.h>
|
||||||
|
#include "utils/Utils.h"
|
||||||
|
|
||||||
|
int main(int argc , char**argv) {
|
||||||
|
|
||||||
|
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
|
||||||
|
if (!parameters.is_valid) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
auto client = milvus::ConnectionImpl();
|
||||||
|
milvus::ConnectParam connect_param;
|
||||||
|
connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1" : parameters.address_;
|
||||||
|
connect_param.port = parameters.port_.empty() ? "19530" : parameters.port_;
|
||||||
|
client.Connect(connect_param);
|
||||||
|
|
||||||
|
milvus::Status stat;
|
||||||
|
const std::string collectin_name = "collection1";
|
||||||
|
|
||||||
|
int64_t count = 0;
|
||||||
|
stat = client.CountEntities(collectin_name, count);
|
||||||
|
if (!stat.ok()){
|
||||||
|
std::cerr << "Error: " << stat.message() << std::endl;
|
||||||
|
}
|
||||||
|
std::cout << "Collection " << collectin_name << " rows: " << count << std::endl;
|
||||||
|
|
||||||
|
}
|
@ -18,7 +18,7 @@ int main(int argc , char**argv) {
|
|||||||
client.Connect(connect_param);
|
client.Connect(connect_param);
|
||||||
|
|
||||||
milvus::Status stat;
|
milvus::Status stat;
|
||||||
const std::string collectin_name = "collection0";
|
const std::string collectin_name = "collection1";
|
||||||
|
|
||||||
// Create
|
// Create
|
||||||
milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
|
milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
#include "interface/ConnectionImpl.h"
|
#include "interface/ConnectionImpl.h"
|
||||||
#include "utils/Utils.h"
|
#include "utils/Utils.h"
|
||||||
|
|
||||||
const std::string COLLECTION = "collection_0";
|
const std::string COLLECTION = "collection1";
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
|
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
|
||||||
|
@ -32,7 +32,7 @@ main(int argc, char *argv[]) {
|
|||||||
delete_ids.push_back(1);
|
delete_ids.push_back(1);
|
||||||
delete_ids.push_back(2);
|
delete_ids.push_back(2);
|
||||||
delete_ids.push_back(3);
|
delete_ids.push_back(3);
|
||||||
client.DeleteEntityByID("collection0", delete_ids);
|
client.DeleteEntityByID("collection1", delete_ids);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user