mirror of
synced 2024-12-01 19:39:21 +08:00
Add gcNode to write node
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
@ -36,14 +36,6 @@ services:
- milvus
image: jaegertracing/all-in-one:latest
- "6831:6831/udp"
- "16686:16686"
- milvus
@ -83,10 +83,5 @@ services:
- milvus
image: jaegertracing/all-in-one:latest
- milvus
@ -4,17 +4,14 @@ go 1.15
require (
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/apache/pulsar-client-go v0.1.1
github.com/apache/thrift v0.13.0
github.com/aws/aws-sdk-go v1.30.8 // indirect
github.com/aws/aws-sdk-go v1.30.8
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/frankban/quicktest v1.10.2 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/git-hooks/git-hooks v1.3.1 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0
github.com/klauspost/compress v1.10.11 // indirect
@ -23,12 +20,12 @@ require (
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.12.1 // indirect
github.com/onsi/gomega v1.10.0 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 // indirect
github.com/pingcap/errors v0.11.4 // indirect
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 // indirect
github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect
github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/prometheus/common v0.10.0 // indirect
github.com/prometheus/procfs v0.1.3 // indirect
@ -38,9 +35,7 @@ require (
github.com/spf13/cast v1.3.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/tikv/client-go v0.0.0-20200824032810-95774393107b // indirect
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
github.com/tikv/client-go v0.0.0-20200824032810-95774393107b
github.com/urfave/cli v1.22.5 // indirect
github.com/yahoo/athenz v1.9.16 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
@ -55,7 +50,7 @@ require (
google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 // indirect
google.golang.org/grpc v1.31.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
@ -15,8 +15,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw=
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
@ -26,8 +24,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1C
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apache/pulsar-client-go v0.1.1 h1:v/kU+2ZCC6yFIcbZrFtWa9/nvVzVr18L+xYJUvZSxEQ=
github.com/apache/pulsar-client-go v0.1.1/go.mod h1:mlxC65KL1BLhGO2bnT9zWMttVzR2czVPb27D477YpyU=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
@ -121,7 +117,6 @@ github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18h
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -348,7 +343,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/protocolbuffers/protobuf v3.14.0+incompatible h1:8r0H76h/Q/lEnFFY60AuM23NOnaDMi6bd7zuboSYM+o=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 h1:/NRJ5vAYoqz+7sG51ubIDHXeWO8DlTSrToPu6q11ziA=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@ -409,12 +403,6 @@ github.com/tikv/client-go v0.0.0-20200824032810-95774393107b/go.mod h1:K0NcdVNrX
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber/jaeger-client-go v1.6.0 h1:3+zLlq+4npI5fg8IsgAje3YsP7TcEdNzJScyqFIzxEQ=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo=
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA=
github.com/unrolled/render v1.0.0 h1:XYtvhA3UkpB7PqkvhUFYmpKD55OudoIeygcfus4vcd4=
@ -55,7 +55,6 @@ IndexWrapper::parse_impl(const std::string& serialized_params_str, knowhere::Con
auto stoi_closure = [](const std::string& s) -> int { return std::stoi(s); };
auto stof_closure = [](const std::string& s) -> int { return std::stof(s); };
/***************************** meta *******************************/
check_parameter<int>(conf, milvus::knowhere::meta::DIM, stoi_closure, std::nullopt);
@ -89,7 +88,7 @@ IndexWrapper::parse_impl(const std::string& serialized_params_str, knowhere::Con
check_parameter<int>(conf, milvus::knowhere::IndexParams::edge_size, stoi_closure, std::nullopt);
/************************** NGT Search Params *****************************/
check_parameter<float>(conf, milvus::knowhere::IndexParams::epsilon, stof_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::epsilon, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::max_search_edges, stoi_closure, std::nullopt);
/************************** NGT_PANNG Params *****************************/
@ -275,12 +274,6 @@ IndexWrapper::QueryWithParam(const knowhere::DatasetPtr& dataset, const char* se
IndexWrapper::QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Config& conf) {
auto load_raw_data_closure = [&]() { LoadRawData(); }; // hide this pointer
auto index_type = get_index_type();
if (is_in_nm_list(index_type)) {
std::call_once(raw_data_loaded_, load_raw_data_closure);
auto res = index_->Query(dataset, conf, nullptr);
auto ids = res->Get<int64_t*>(milvus::knowhere::meta::IDS);
auto distances = res->Get<float*>(milvus::knowhere::meta::DISTANCE);
@ -298,19 +291,5 @@ IndexWrapper::QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Con
return std::move(query_res);
IndexWrapper::LoadRawData() {
auto index_type = get_index_type();
if (is_in_nm_list(index_type)) {
auto bs = index_->Serialize(config_);
auto bptr = std::make_shared<milvus::knowhere::Binary>();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
bptr->data = std::shared_ptr<uint8_t[]>(static_cast<uint8_t*>(raw_data_.data()), deleter);
bptr->size = raw_data_.size();
bs.Append(RAW_DATA, bptr);
} // namespace indexbuilder
} // namespace milvus
@ -66,9 +66,6 @@ class IndexWrapper {
StoreRawData(const knowhere::DatasetPtr& dataset);
template <typename T>
check_parameter(knowhere::Config& conf,
@ -95,7 +92,6 @@ class IndexWrapper {
milvus::json index_config_;
knowhere::Config config_;
std::vector<uint8_t> raw_data_;
std::once_flag raw_data_loaded_;
} // namespace indexbuilder
@ -11,8 +11,6 @@
#include <tuple>
#include <map>
#include <limits>
#include <math.h>
#include <gtest/gtest.h>
#include <google/protobuf/text_format.h>
@ -43,16 +41,16 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IDMAP) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::Metric::TYPE, metric_type},
{milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, 4},
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IVFPQ) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::IndexParams::nlist, 100},
{milvus::knowhere::IndexParams::nprobe, 4},
// {milvus::knowhere::IndexParams::nprobe, 4},
{milvus::knowhere::IndexParams::m, 4},
{milvus::knowhere::IndexParams::nbits, 8},
{milvus::knowhere::Metric::TYPE, metric_type},
@ -61,9 +59,9 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::IndexParams::nlist, 100},
{milvus::knowhere::IndexParams::nprobe, 4},
// {milvus::knowhere::IndexParams::nprobe, 4},
{milvus::knowhere::Metric::TYPE, metric_type},
{milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, 4},
@ -73,9 +71,9 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::IndexParams::nlist, 100},
{milvus::knowhere::IndexParams::nprobe, 4},
// {milvus::knowhere::IndexParams::nprobe, 4},
{milvus::knowhere::IndexParams::nbits, 8},
{milvus::knowhere::Metric::TYPE, metric_type},
{milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, 4},
@ -86,9 +84,9 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::IndexParams::nlist, 100},
{milvus::knowhere::IndexParams::nprobe, 4},
// {milvus::knowhere::IndexParams::nprobe, 4},
{milvus::knowhere::IndexParams::m, 4},
{milvus::knowhere::IndexParams::nbits, 8},
{milvus::knowhere::Metric::TYPE, metric_type},
@ -97,14 +95,13 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::Metric::TYPE, metric_type},
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_NSG) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::IndexParams::nlist, 163},
{milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::IndexParams::nprobe, 8},
{milvus::knowhere::IndexParams::knng, 20},
{milvus::knowhere::IndexParams::search_length, 40},
@ -130,14 +127,17 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_HNSW) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM}, {milvus::knowhere::meta::TOPK, K},
{milvus::knowhere::IndexParams::M, 16}, {milvus::knowhere::IndexParams::efConstruction, 200},
{milvus::knowhere::IndexParams::ef, 200}, {milvus::knowhere::Metric::TYPE, metric_type},
{milvus::knowhere::meta::DIM, DIM},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::IndexParams::M, 16},
{milvus::knowhere::IndexParams::efConstruction, 200},
{milvus::knowhere::IndexParams::ef, 200},
{milvus::knowhere::Metric::TYPE, metric_type},
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_ANNOY) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::IndexParams::n_trees, 4},
{milvus::knowhere::IndexParams::search_k, 100},
{milvus::knowhere::Metric::TYPE, metric_type},
@ -146,7 +146,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_RHNSWFlat) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::IndexParams::M, 16},
{milvus::knowhere::IndexParams::efConstruction, 200},
{milvus::knowhere::IndexParams::ef, 200},
@ -156,7 +156,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_RHNSWPQ) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::IndexParams::M, 16},
{milvus::knowhere::IndexParams::efConstruction, 200},
{milvus::knowhere::IndexParams::ef, 200},
@ -167,7 +167,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_RHNSWSQ) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::IndexParams::M, 16},
{milvus::knowhere::IndexParams::efConstruction, 200},
{milvus::knowhere::IndexParams::ef, 200},
@ -177,7 +177,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_NGTPANNG) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::Metric::TYPE, metric_type},
{milvus::knowhere::IndexParams::edge_size, 10},
{milvus::knowhere::IndexParams::epsilon, 0.1},
@ -189,7 +189,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh
} else if (index_type == milvus::knowhere::IndexEnum::INDEX_NGTONNG) {
return milvus::knowhere::Config{
{milvus::knowhere::meta::DIM, DIM},
{milvus::knowhere::meta::TOPK, K},
// {milvus::knowhere::meta::TOPK, 10},
{milvus::knowhere::Metric::TYPE, metric_type},
{milvus::knowhere::IndexParams::edge_size, 20},
{milvus::knowhere::IndexParams::epsilon, 0.1},
@ -234,99 +234,6 @@ GenDataset(int64_t N, const milvus::knowhere::MetricType& metric_type, bool is_b
return milvus::segcore::DataGen(schema, N);
using QueryResultPtr = std::unique_ptr<milvus::indexbuilder::IndexWrapper::QueryResult>;
PrintQueryResult(const QueryResultPtr& result) {
auto nq = result->nq;
auto k = result->topk;
std::stringstream ss_id;
std::stringstream ss_dist;
for (auto i = 0; i < nq; i++) {
for (auto j = 0; j < k; ++j) {
ss_id << result->ids[i * k + j] << " ";
ss_dist << result->distances[i * k + j] << " ";
ss_id << std::endl;
ss_dist << std::endl;
std::cout << "id\n" << ss_id.str() << std::endl;
std::cout << "dist\n" << ss_dist.str() << std::endl;
L2(const float* point_a, const float* point_b, int dim) {
float dis = 0;
for (auto i = 0; i < dim; i++) {
auto c_a = point_a[i];
auto c_b = point_b[i];
dis += pow(c_b - c_a, 2);
return dis;
int hamming_weight(uint8_t n) {
int count=0;
while(n != 0){
count += n&1;
n >>= 1;
return count;
Jaccard(const uint8_t* point_a, const uint8_t* point_b, int dim) {
float dis;
int len = dim / 8;
float intersection = 0;
float union_num = 0;
for (int i = 0; i < len; i++) {
intersection += hamming_weight(point_a[i] & point_b[i]);
union_num += hamming_weight(point_a[i] | point_b[i]);
dis = 1 - (intersection / union_num);
return dis;
CountDistance(const void* point_a,
const void* point_b,
int dim,
const milvus::knowhere::MetricType& metric,
bool is_binary = false) {
if (point_a == nullptr || point_b == nullptr) {
return std::numeric_limits<float>::max();
if (metric == milvus::knowhere::Metric::L2) {
return L2(static_cast<const float*>(point_a), static_cast<const float*>(point_b), dim);
} else if (metric == milvus::knowhere::Metric::JACCARD) {
return Jaccard(static_cast<const uint8_t*>(point_a), static_cast<const uint8_t*>(point_b), dim);
} else {
return std::numeric_limits<float>::max();
CheckDistances(const QueryResultPtr& result,
const milvus::knowhere::DatasetPtr& base_dataset,
const milvus::knowhere::DatasetPtr& query_dataset,
const milvus::knowhere::MetricType& metric,
const float threshold = 1.0e-5) {
auto base_vecs = base_dataset->Get<float*>(milvus::knowhere::meta::TENSOR);
auto query_vecs = query_dataset->Get<float*>(milvus::knowhere::meta::TENSOR);
auto dim = base_dataset->Get<int64_t>(milvus::knowhere::meta::DIM);
auto nq = result->nq;
auto k = result->topk;
for (auto i = 0; i < nq; i++) {
for (auto j = 0; j < k; ++j) {
auto dis = result->distances[i * k + j];
auto id = result->ids[i * k + j];
auto count_dis = CountDistance(query_vecs + i * dim, base_vecs + id * dim, dim, metric);
// assert(std::abs(dis - count_dis) < threshold);
} // namespace
using Param = std::pair<milvus::knowhere::IndexType, milvus::knowhere::MetricType>;
@ -340,26 +247,8 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> {
metric_type = param.second;
std::tie(type_params, index_params) = generate_params(index_type, metric_type);
std::map<std::string, bool> is_binary_map = {
{milvus::knowhere::IndexEnum::INDEX_FAISS_IDMAP, false},
{milvus::knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false},
{milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, false},
{milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, false},
{milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true},
{milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, true},
{milvus::knowhere::IndexEnum::INDEX_SPTAG_KDT_RNT, false},
{milvus::knowhere::IndexEnum::INDEX_SPTAG_BKT_RNT, false},
{milvus::knowhere::IndexEnum::INDEX_HNSW, false},
{milvus::knowhere::IndexEnum::INDEX_ANNOY, false},
{milvus::knowhere::IndexEnum::INDEX_RHNSWFlat, false},
{milvus::knowhere::IndexEnum::INDEX_RHNSWPQ, false},
{milvus::knowhere::IndexEnum::INDEX_RHNSWSQ, false},
{milvus::knowhere::IndexEnum::INDEX_NGTPANNG, false},
{milvus::knowhere::IndexEnum::INDEX_NGTONNG, false},
{milvus::knowhere::IndexEnum::INDEX_NSG, false},
std::map<std::string, bool> is_binary_map = {{milvus::knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false},
{milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true}};
is_binary = is_binary_map[index_type];
@ -373,13 +262,9 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> {
if (!is_binary) {
xb_data = dataset.get_col<float>(0);
xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_data.data());
xq_data = dataset.get_col<float>(0);
xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_data.data());
} else {
xb_bin_data = dataset.get_col<uint8_t>(0);
xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_bin_data.data());
xq_bin_data = dataset.get_col<uint8_t>(0);
xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_bin_data.data());
@ -397,9 +282,6 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> {
std::vector<float> xb_data;
std::vector<uint8_t> xb_bin_data;
std::vector<milvus::knowhere::IDType> ids;
milvus::knowhere::DatasetPtr xq_dataset;
std::vector<float> xq_data;
std::vector<uint8_t> xq_bin_data;
TEST(PQ, Build) {
@ -426,47 +308,6 @@ TEST(IVFFLATNM, Build) {
ASSERT_NO_THROW(index->AddWithoutIds(xb_dataset, conf));
auto index_type = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
auto metric_type = milvus::knowhere::Metric::L2;
auto conf = generate_conf(index_type, metric_type);
auto index = milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_type);
auto dataset = GenDataset(NB, metric_type, false);
auto xb_data = dataset.get_col<float>(0);
auto xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_data.data());
ASSERT_NO_THROW(index->Train(xb_dataset, conf));
ASSERT_NO_THROW(index->AddWithoutIds(xb_dataset, conf));
auto bs = index->Serialize(conf);
auto bptr = std::make_shared<milvus::knowhere::Binary>();
bptr->data = std::shared_ptr<uint8_t[]>((uint8_t*)xb_data.data(), [&](uint8_t*) {});
bptr->size = DIM * NB * sizeof(float);
bs.Append(RAW_DATA, bptr);
auto xq_data = dataset.get_col<float>(0);
auto xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_data.data());
auto result = index->Query(xq_dataset, conf, nullptr);
TEST(NSG, Query) {
auto index_type = milvus::knowhere::IndexEnum::INDEX_NSG;
auto metric_type = milvus::knowhere::Metric::L2;
auto conf = generate_conf(index_type, metric_type);
auto index = milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_type);
auto dataset = GenDataset(NB, metric_type, false);
auto xb_data = dataset.get_col<float>(0);
auto xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_data.data());
index->BuildAll(xb_dataset, conf);
auto bs = index->Serialize(conf);
auto bptr = std::make_shared<milvus::knowhere::Binary>();
bptr->data = std::shared_ptr<uint8_t[]>((uint8_t*)xb_data.data(), [&](uint8_t*) {});
bptr->size = DIM * NB * sizeof(float);
bs.Append(RAW_DATA, bptr);
auto xq_data = dataset.get_col<float>(0);
auto xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_data.data());
auto result = index->Query(xq_dataset, conf, nullptr);
auto index_type = milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT;
auto metric_type = milvus::knowhere::Metric::JACCARD;
@ -644,7 +485,12 @@ TEST_P(IndexWrapperTest, Dim) {
TEST_P(IndexWrapperTest, BuildWithoutIds) {
auto index =
std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str());
if (milvus::indexbuilder::is_in_need_id_list(index_type)) {
} else {
TEST_P(IndexWrapperTest, Codec) {
@ -665,16 +511,3 @@ TEST_P(IndexWrapperTest, Codec) {
ASSERT_EQ(strcmp(binary.data, copy_binary.data), 0);
TEST_P(IndexWrapperTest, Query) {
auto index_wrapper =
std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str());
std::unique_ptr<milvus::indexbuilder::IndexWrapper::QueryResult> query_result = index_wrapper->Query(xq_dataset);
ASSERT_EQ(query_result->topk, K);
ASSERT_EQ(query_result->nq, NQ);
ASSERT_EQ(query_result->distances.size(), query_result->topk * query_result->nq);
ASSERT_EQ(query_result->ids.size(), query_result->topk * query_result->nq);
@ -218,7 +218,6 @@ func CreateServer(ctx context.Context) (*Master, error) {
m.grpcServer = grpc.NewServer()
masterpb.RegisterMasterServer(m.grpcServer, m)
return m, nil
@ -110,7 +110,6 @@ func TestMaster(t *testing.T) {
conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock())
require.Nil(t, err)
cli := masterpb.NewMasterClient(conn)
t.Run("TestConfigTask", func(t *testing.T) {
@ -887,6 +886,12 @@ func TestMaster(t *testing.T) {
var k2sMsgstream ms.MsgStream = k2sMs
assert.True(t, receiveTimeTickMsg(&k2sMsgstream))
conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock())
assert.Nil(t, err)
defer conn.Close()
cli := masterpb.NewMasterClient(conn)
sch := schemapb.CollectionSchema{
Name: "name" + strconv.FormatUint(rand.Uint64(), 10),
Description: "test collection",
@ -1,8 +1,6 @@
package msgstream
import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@ -10,8 +8,6 @@ import (
type MsgType = internalPb.MsgType
type TsMsg interface {
GetContext() context.Context
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
@ -21,7 +17,6 @@ type TsMsg interface {
type BaseMsg struct {
ctx context.Context
BeginTimestamp Timestamp
EndTimestamp Timestamp
HashValues []uint32
@ -49,14 +44,6 @@ func (it *InsertMsg) Type() MsgType {
return it.MsgType
func (it *InsertMsg) GetContext() context.Context {
return it.ctx
func (it *InsertMsg) SetContext(ctx context.Context) {
it.ctx = ctx
func (it *InsertMsg) Marshal(input TsMsg) ([]byte, error) {
insertMsg := input.(*InsertMsg)
insertRequest := &insertMsg.InsertRequest
@ -101,13 +88,6 @@ func (fl *FlushMsg) Type() MsgType {
return fl.GetMsgType()
func (fl *FlushMsg) GetContext() context.Context {
return fl.ctx
func (fl *FlushMsg) SetContext(ctx context.Context) {
fl.ctx = ctx
func (fl *FlushMsg) Marshal(input TsMsg) ([]byte, error) {
flushMsgTask := input.(*FlushMsg)
flushMsg := &flushMsgTask.FlushMsg
@ -141,14 +121,6 @@ func (dt *DeleteMsg) Type() MsgType {
return dt.MsgType
func (dt *DeleteMsg) GetContext() context.Context {
return dt.ctx
func (dt *DeleteMsg) SetContext(ctx context.Context) {
dt.ctx = ctx
func (dt *DeleteMsg) Marshal(input TsMsg) ([]byte, error) {
deleteTask := input.(*DeleteMsg)
deleteRequest := &deleteTask.DeleteRequest
@ -193,14 +165,6 @@ func (st *SearchMsg) Type() MsgType {
return st.MsgType
func (st *SearchMsg) GetContext() context.Context {
return st.ctx
func (st *SearchMsg) SetContext(ctx context.Context) {
st.ctx = ctx
func (st *SearchMsg) Marshal(input TsMsg) ([]byte, error) {
searchTask := input.(*SearchMsg)
searchRequest := &searchTask.SearchRequest
@ -234,14 +198,6 @@ func (srt *SearchResultMsg) Type() MsgType {
return srt.MsgType
func (srt *SearchResultMsg) GetContext() context.Context {
return srt.ctx
func (srt *SearchResultMsg) SetContext(ctx context.Context) {
srt.ctx = ctx
func (srt *SearchResultMsg) Marshal(input TsMsg) ([]byte, error) {
searchResultTask := input.(*SearchResultMsg)
searchResultRequest := &searchResultTask.SearchResult
@ -275,14 +231,6 @@ func (tst *TimeTickMsg) Type() MsgType {
return tst.MsgType
func (tst *TimeTickMsg) GetContext() context.Context {
return tst.ctx
func (tst *TimeTickMsg) SetContext(ctx context.Context) {
tst.ctx = ctx
func (tst *TimeTickMsg) Marshal(input TsMsg) ([]byte, error) {
timeTickTask := input.(*TimeTickMsg)
timeTick := &timeTickTask.TimeTickMsg
@ -316,14 +264,6 @@ func (qs *QueryNodeStatsMsg) Type() MsgType {
return qs.MsgType
func (qs *QueryNodeStatsMsg) GetContext() context.Context {
return qs.ctx
func (qs *QueryNodeStatsMsg) SetContext(ctx context.Context) {
qs.ctx = ctx
func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) ([]byte, error) {
queryNodeSegStatsTask := input.(*QueryNodeStatsMsg)
queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats
@ -365,14 +305,6 @@ func (cc *CreateCollectionMsg) Type() MsgType {
return cc.MsgType
func (cc *CreateCollectionMsg) GetContext() context.Context {
return cc.ctx
func (cc *CreateCollectionMsg) SetContext(ctx context.Context) {
cc.ctx = ctx
func (cc *CreateCollectionMsg) Marshal(input TsMsg) ([]byte, error) {
createCollectionMsg := input.(*CreateCollectionMsg)
createCollectionRequest := &createCollectionMsg.CreateCollectionRequest
@ -405,13 +337,6 @@ type DropCollectionMsg struct {
func (dc *DropCollectionMsg) Type() MsgType {
return dc.MsgType
func (dc *DropCollectionMsg) GetContext() context.Context {
return dc.ctx
func (dc *DropCollectionMsg) SetContext(ctx context.Context) {
dc.ctx = ctx
func (dc *DropCollectionMsg) Marshal(input TsMsg) ([]byte, error) {
dropCollectionMsg := input.(*DropCollectionMsg)
@ -436,20 +361,111 @@ func (dc *DropCollectionMsg) Unmarshal(input []byte) (TsMsg, error) {
return dropCollectionMsg, nil
type HasCollectionMsg struct {
func (hc *HasCollectionMsg) Type() MsgType {
return hc.MsgType
func (hc *HasCollectionMsg) Marshal(input TsMsg) ([]byte, error) {
hasCollectionMsg := input.(*HasCollectionMsg)
hasCollectionRequest := &hasCollectionMsg.HasCollectionRequest
mb, err := proto.Marshal(hasCollectionRequest)
if err != nil {
return nil, err
return mb, nil
func (hc *HasCollectionMsg) Unmarshal(input []byte) (TsMsg, error) {
hasCollectionRequest := internalPb.HasCollectionRequest{}
err := proto.Unmarshal(input, &hasCollectionRequest)
if err != nil {
return nil, err
hasCollectionMsg := &HasCollectionMsg{HasCollectionRequest: hasCollectionRequest}
hasCollectionMsg.BeginTimestamp = hasCollectionMsg.Timestamp
hasCollectionMsg.EndTimestamp = hasCollectionMsg.Timestamp
return hasCollectionMsg, nil
type DescribeCollectionMsg struct {
func (dc *DescribeCollectionMsg) Type() MsgType {
return dc.MsgType
func (dc *DescribeCollectionMsg) Marshal(input TsMsg) ([]byte, error) {
describeCollectionMsg := input.(*DescribeCollectionMsg)
describeCollectionRequest := &describeCollectionMsg.DescribeCollectionRequest
mb, err := proto.Marshal(describeCollectionRequest)
if err != nil {
return nil, err
return mb, nil
func (dc *DescribeCollectionMsg) Unmarshal(input []byte) (TsMsg, error) {
describeCollectionRequest := internalPb.DescribeCollectionRequest{}
err := proto.Unmarshal(input, &describeCollectionRequest)
if err != nil {
return nil, err
describeCollectionMsg := &DescribeCollectionMsg{DescribeCollectionRequest: describeCollectionRequest}
describeCollectionMsg.BeginTimestamp = describeCollectionMsg.Timestamp
describeCollectionMsg.EndTimestamp = describeCollectionMsg.Timestamp
return describeCollectionMsg, nil
type ShowCollectionMsg struct {
func (sc *ShowCollectionMsg) Type() MsgType {
return sc.MsgType
func (sc *ShowCollectionMsg) Marshal(input TsMsg) ([]byte, error) {
showCollectionMsg := input.(*ShowCollectionMsg)
showCollectionRequest := &showCollectionMsg.ShowCollectionRequest
mb, err := proto.Marshal(showCollectionRequest)
if err != nil {
return nil, err
return mb, nil
func (sc *ShowCollectionMsg) Unmarshal(input []byte) (TsMsg, error) {
showCollectionRequest := internalPb.ShowCollectionRequest{}
err := proto.Unmarshal(input, &showCollectionRequest)
if err != nil {
return nil, err
showCollectionMsg := &ShowCollectionMsg{ShowCollectionRequest: showCollectionRequest}
showCollectionMsg.BeginTimestamp = showCollectionMsg.Timestamp
showCollectionMsg.EndTimestamp = showCollectionMsg.Timestamp
return showCollectionMsg, nil
type CreatePartitionMsg struct {
func (cc *CreatePartitionMsg) GetContext() context.Context {
return cc.ctx
func (cc *CreatePartitionMsg) SetContext(ctx context.Context) {
cc.ctx = ctx
func (cc *CreatePartitionMsg) Type() MsgType {
return cc.MsgType
@ -483,14 +499,6 @@ type DropPartitionMsg struct {
func (dc *DropPartitionMsg) GetContext() context.Context {
return dc.ctx
func (dc *DropPartitionMsg) SetContext(ctx context.Context) {
dc.ctx = ctx
func (dc *DropPartitionMsg) Type() MsgType {
return dc.MsgType
@ -518,6 +526,105 @@ func (dc *DropPartitionMsg) Unmarshal(input []byte) (TsMsg, error) {
return dropPartitionMsg, nil
type HasPartitionMsg struct {
func (hc *HasPartitionMsg) Type() MsgType {
return hc.MsgType
func (hc *HasPartitionMsg) Marshal(input TsMsg) ([]byte, error) {
hasPartitionMsg := input.(*HasPartitionMsg)
hasPartitionRequest := &hasPartitionMsg.HasPartitionRequest
mb, err := proto.Marshal(hasPartitionRequest)
if err != nil {
return nil, err
return mb, nil
func (hc *HasPartitionMsg) Unmarshal(input []byte) (TsMsg, error) {
hasPartitionRequest := internalPb.HasPartitionRequest{}
err := proto.Unmarshal(input, &hasPartitionRequest)
if err != nil {
return nil, err
hasPartitionMsg := &HasPartitionMsg{HasPartitionRequest: hasPartitionRequest}
hasPartitionMsg.BeginTimestamp = hasPartitionMsg.Timestamp
hasPartitionMsg.EndTimestamp = hasPartitionMsg.Timestamp
return hasPartitionMsg, nil
type DescribePartitionMsg struct {
func (dc *DescribePartitionMsg) Type() MsgType {
return dc.MsgType
func (dc *DescribePartitionMsg) Marshal(input TsMsg) ([]byte, error) {
describePartitionMsg := input.(*DescribePartitionMsg)
describePartitionRequest := &describePartitionMsg.DescribePartitionRequest
mb, err := proto.Marshal(describePartitionRequest)
if err != nil {
return nil, err
return mb, nil
func (dc *DescribePartitionMsg) Unmarshal(input []byte) (TsMsg, error) {
describePartitionRequest := internalPb.DescribePartitionRequest{}
err := proto.Unmarshal(input, &describePartitionRequest)
if err != nil {
return nil, err
describePartitionMsg := &DescribePartitionMsg{DescribePartitionRequest: describePartitionRequest}
describePartitionMsg.BeginTimestamp = describePartitionMsg.Timestamp
describePartitionMsg.EndTimestamp = describePartitionMsg.Timestamp
return describePartitionMsg, nil
type ShowPartitionMsg struct {
func (sc *ShowPartitionMsg) Type() MsgType {
return sc.MsgType
func (sc *ShowPartitionMsg) Marshal(input TsMsg) ([]byte, error) {
showPartitionMsg := input.(*ShowPartitionMsg)
showPartitionRequest := &showPartitionMsg.ShowPartitionRequest
mb, err := proto.Marshal(showPartitionRequest)
if err != nil {
return nil, err
return mb, nil
func (sc *ShowPartitionMsg) Unmarshal(input []byte) (TsMsg, error) {
showPartitionRequest := internalPb.ShowPartitionRequest{}
err := proto.Unmarshal(input, &showPartitionRequest)
if err != nil {
return nil, err
showPartitionMsg := &ShowPartitionMsg{ShowPartitionRequest: showPartitionRequest}
showPartitionMsg.BeginTimestamp = showPartitionMsg.Timestamp
showPartitionMsg.EndTimestamp = showPartitionMsg.Timestamp
return showPartitionMsg, nil
type LoadIndexMsg struct {
@ -528,14 +635,6 @@ func (lim *LoadIndexMsg) Type() MsgType {
return lim.MsgType
func (lim *LoadIndexMsg) GetContext() context.Context {
return lim.ctx
func (lim *LoadIndexMsg) SetContext(ctx context.Context) {
lim.ctx = ctx
func (lim *LoadIndexMsg) Marshal(input TsMsg) ([]byte, error) {
loadIndexMsg := input.(*LoadIndexMsg)
loadIndexRequest := &loadIndexMsg.LoadIndex
@ -4,13 +4,9 @@ import (
@ -155,29 +151,6 @@ func (ms *PulsarMsgStream) Close() {
type propertiesReaderWriter struct {
ppMap map[string]string
func (ppRW *propertiesReaderWriter) Set(key, val string) {
// The GRPC HPACK implementation rejects any uppercase keys here.
// As such, since the HTTP_HEADERS format is case-insensitive anyway, we
// blindly lowercase the key (which is guaranteed to work in the
// Inject/Extract sense per the OpenTracing spec).
key = strings.ToLower(key)
ppRW.ppMap[key] = val
func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) error) error {
for k, val := range ppRW.ppMap {
if err := handler(k, val); err != nil {
return err
return nil
func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
if len(tsMsgs) <= 0 {
@ -227,41 +200,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
if err != nil {
return err
msg := &pulsar.ProducerMessage{Payload: mb}
var child opentracing.Span
if v.Msgs[i].Type() == internalPb.MsgType_kInsert || v.Msgs[i].Type() == internalPb.MsgType_kSearch {
tracer := opentracing.GlobalTracer()
ctx := v.Msgs[i].GetContext()
if ctx == nil {
ctx = context.Background()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
} else {
child = tracer.StartSpan("start send pulsar msg")
child.SetTag("hash keys", v.Msgs[i].HashKeys())
child.SetTag("start time", v.Msgs[i].BeginTs())
child.SetTag("end time", v.Msgs[i].EndTs())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
return err
if _, err := (*ms.producers[k]).Send(
&pulsar.ProducerMessage{Payload: mb},
); err != nil {
return err
if child != nil {
return nil
@ -274,34 +218,10 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
if err != nil {
return err
msg := &pulsar.ProducerMessage{Payload: mb}
if v.Type() == internalPb.MsgType_kInsert || v.Type() == internalPb.MsgType_kSearch {
tracer := opentracing.GlobalTracer()
ctx := v.GetContext()
if ctx == nil {
ctx = context.Background()
var child opentracing.Span
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
} else {
child = tracer.StartSpan("start send pulsar msg, start time: %d")
child.SetTag("hash keys", v.HashKeys())
child.SetTag("start time", v.BeginTs())
child.SetTag("end time", v.EndTs())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
return err
for i := 0; i < producerLen; i++ {
if _, err := (*ms.producers[i]).Send(
&pulsar.ProducerMessage{Payload: mb},
); err != nil {
return err
@ -338,7 +258,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
for {
select {
case <-ms.ctx.Done():
tsMsgList := make([]TsMsg, 0)
@ -351,7 +270,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage)
if !ok {
log.Printf("type assertion failed, not consumer message type")
@ -365,21 +283,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.MsgType)
if tsMsg.Type() == internalPb.MsgType_kInsert || tsMsg.Type() == internalPb.MsgType_kSearch {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
span := opentracing.StartSpan("pulsar msg received",
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
tsMsg.SetContext(opentracing.ContextWithSpan(context.Background(), span))
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
@ -517,23 +420,6 @@ func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
if err != nil {
log.Printf("Failed to unmarshal, error = %v", err)
if tsMsg.Type() == internalPb.MsgType_kInsert || tsMsg.Type() == internalPb.MsgType_kSearch {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
span := opentracing.StartSpan("pulsar msg received",
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
tsMsg.SetContext(opentracing.ContextWithSpan(context.Background(), span))
if headerMsg.MsgType == internalPb.MsgType_kTimeTick {
eofMsgMap[channelIndex] = tsMsg.(*TimeTickMsg).Timestamp
@ -614,7 +500,7 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if request.Type() != internalPb.MsgType_kInsert {
return nil, errors.New("msg's must be Insert")
return nil, errors.New(string("msg's must be Insert"))
insertRequest := request.(*InsertMsg)
keys := hashKeys[i]
@ -625,7 +511,7 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
keysLen := len(keys)
if keysLen != timestampLen || keysLen != rowIDLen || keysLen != rowDataLen {
return nil, errors.New("the length of hashValue, timestamps, rowIDs, RowData are not equal")
return nil, errors.New(string("the length of hashValue, timestamps, rowIDs, RowData are not equal"))
for index, key := range keys {
_, ok := result[key]
@ -648,9 +534,6 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
insertMsg := &InsertMsg{
BaseMsg: BaseMsg{
ctx: request.GetContext(),
InsertRequest: sliceRequest,
result[key].Msgs = append(result[key].Msgs, insertMsg)
@ -663,7 +546,7 @@ func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if request.Type() != internalPb.MsgType_kDelete {
return nil, errors.New("msg's must be Delete")
return nil, errors.New(string("msg's must be Delete"))
deleteRequest := request.(*DeleteMsg)
keys := hashKeys[i]
@ -673,7 +556,7 @@ func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
keysLen := len(keys)
if keysLen != timestampLen || keysLen != primaryKeysLen {
return nil, errors.New("the length of hashValue, timestamps, primaryKeys are not equal")
return nil, errors.New(string("the length of hashValue, timestamps, primaryKeys are not equal"))
for index, key := range keys {
@ -707,7 +590,7 @@ func defaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack,
for i, request := range tsMsgs {
keys := hashKeys[i]
if len(keys) != 1 {
return nil, errors.New("len(msg.hashValue) must equal 1")
return nil, errors.New(string("len(msg.hashValue) must equal 1"))
key := keys[0]
_, ok := result[key]
@ -2,8 +2,6 @@ package proxy
import (
@ -11,10 +9,6 @@ import (
@ -45,9 +39,6 @@ type Proxy struct {
manipulationMsgStream *msgstream.PulsarMsgStream
queryMsgStream *msgstream.PulsarMsgStream
tracer opentracing.Tracer
closer io.Closer
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -60,28 +51,11 @@ func Init() {
func CreateProxy(ctx context.Context) (*Proxy, error) {
ctx1, cancel := context.WithCancel(ctx)
var err error
p := &Proxy{
proxyLoopCtx: ctx1,
proxyLoopCancel: cancel,
cfg := &config.Configuration{
ServiceName: "tracing",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
Reporter: &config.ReporterConfig{
LogSpans: true,
p.tracer, p.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
pulsarAddress := Params.PulsarAddress()
p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize())
@ -224,8 +198,6 @@ func (p *Proxy) stopProxyLoop() {
// Close closes the server.
@ -182,7 +182,6 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
insertMsg := &msgstream.InsertMsg{
InsertRequest: sliceRequest,
if together { // all rows with same hash value are accumulated to only one message
if len(result[key].Msgs) <= 0 {
result[key].Msgs = append(result[key].Msgs, insertMsg)
@ -7,9 +7,6 @@ import (
oplog "github.com/opentracing/opentracing-go/log"
@ -77,21 +74,12 @@ func (it *InsertTask) Type() internalpb.MsgType {
func (it *InsertTask) PreExecute() error {
span := opentracing.StartSpan("InsertTask preExecute")
defer span.Finish()
it.ctx = opentracing.ContextWithSpan(it.ctx, span)
span.SetTag("hash keys", it.ReqID)
span.SetTag("start time", it.BeginTs())
collectionName := it.BaseInsertTask.CollectionName
if err := ValidateCollectionName(collectionName); err != nil {
return err
partitionTag := it.BaseInsertTask.PartitionTag
if err := ValidatePartitionTag(partitionTag, true); err != nil {
return err
@ -99,36 +87,22 @@ func (it *InsertTask) PreExecute() error {
func (it *InsertTask) Execute() error {
span, ctx := opentracing.StartSpanFromContext(it.ctx, "InsertTask Execute")
defer span.Finish()
it.ctx = ctx
span.SetTag("hash keys", it.ReqID)
span.SetTag("start time", it.BeginTs())
collectionName := it.BaseInsertTask.CollectionName
span.LogFields(oplog.String("collection_name", collectionName))
if !globalMetaCache.Hit(collectionName) {
err := globalMetaCache.Sync(collectionName)
if err != nil {
return err
description, err := globalMetaCache.Get(collectionName)
if err != nil || description == nil {
return err
autoID := description.Schema.AutoID
span.LogFields(oplog.Bool("auto_id", autoID))
var rowIDBegin UniqueID
var rowIDEnd UniqueID
rowNums := len(it.BaseInsertTask.RowData)
rowIDBegin, rowIDEnd, _ = it.rowIDAllocator.Alloc(uint32(rowNums))
span.LogFields(oplog.Int("rowNums", rowNums),
oplog.Int("rowIDBegin", int(rowIDBegin)),
oplog.Int("rowIDEnd", int(rowIDEnd)))
it.BaseInsertTask.RowIDs = make([]UniqueID, rowNums)
for i := rowIDBegin; i < rowIDEnd; i++ {
offset := i - rowIDBegin
@ -151,8 +125,6 @@ func (it *InsertTask) Execute() error {
EndTs: it.EndTs(),
Msgs: make([]msgstream.TsMsg, 1),
span.LogFields(oplog.String("send msg", "send msg"))
msgPack.Msgs[0] = tsMsg
err = it.manipulationMsgStream.Produce(msgPack)
@ -166,14 +138,11 @@ func (it *InsertTask) Execute() error {
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
it.result.Status.Reason = err.Error()
return nil
func (it *InsertTask) PostExecute() error {
span, _ := opentracing.StartSpanFromContext(it.ctx, "InsertTask postExecute")
defer span.Finish()
return nil
@ -383,38 +352,24 @@ func (qt *QueryTask) SetTs(ts Timestamp) {
func (qt *QueryTask) PreExecute() error {
span := opentracing.StartSpan("InsertTask preExecute")
defer span.Finish()
qt.ctx = opentracing.ContextWithSpan(qt.ctx, span)
span.SetTag("hash keys", qt.ReqID)
span.SetTag("start time", qt.BeginTs())
collectionName := qt.query.CollectionName
if !globalMetaCache.Hit(collectionName) {
err := globalMetaCache.Sync(collectionName)
if err != nil {
return err
_, err := globalMetaCache.Get(collectionName)
if err != nil { // err is not nil if collection not exists
return err
if err := ValidateCollectionName(qt.query.CollectionName); err != nil {
return err
for _, tag := range qt.query.PartitionTags {
if err := ValidatePartitionTag(tag, false); err != nil {
return err
@ -424,8 +379,6 @@ func (qt *QueryTask) PreExecute() error {
queryBytes, err := proto.Marshal(qt.query)
if err != nil {
return err
qt.Query = &commonpb.Blob{
@ -435,10 +388,6 @@ func (qt *QueryTask) PreExecute() error {
func (qt *QueryTask) Execute() error {
span, ctx := opentracing.StartSpanFromContext(qt.ctx, "InsertTask Execute")
defer span.Finish()
span.SetTag("hash keys", qt.ReqID)
span.SetTag("start time", qt.BeginTs())
var tsMsg msgstream.TsMsg = &msgstream.SearchMsg{
SearchRequest: qt.SearchRequest,
BaseMsg: msgstream.BaseMsg{
@ -452,28 +401,20 @@ func (qt *QueryTask) Execute() error {
EndTs: qt.Timestamp,
Msgs: make([]msgstream.TsMsg, 1),
msgPack.Msgs[0] = tsMsg
err := qt.queryMsgStream.Produce(msgPack)
log.Printf("[Proxy] length of searchMsg: %v", len(msgPack.Msgs))
if err != nil {
log.Printf("[Proxy] send search request failed: %v", err)
return err
func (qt *QueryTask) PostExecute() error {
span, _ := opentracing.StartSpanFromContext(qt.ctx, "InsertTask postExecute")
span.SetTag("hash keys", qt.ReqID)
span.SetTag("start time", qt.BeginTs())
for {
select {
case <-qt.ctx.Done():
log.Print("wait to finish failed, timeout!")
span.LogFields(oplog.String("wait to finish failed, timeout", "wait to finish failed, timeout"))
return errors.New("wait to finish failed, timeout")
case searchResults := <-qt.resultBuf:
filterSearchResult := make([]*internalpb.SearchResult, 0)
@ -494,8 +435,6 @@ func (qt *QueryTask) PostExecute() error {
Reason: filterReason,
return errors.New(filterReason)
@ -526,7 +465,6 @@ func (qt *QueryTask) PostExecute() error {
Reason: filterReason,
return nil
@ -538,7 +476,6 @@ func (qt *QueryTask) PostExecute() error {
Reason: filterReason,
return nil
@ -589,13 +526,10 @@ func (qt *QueryTask) PostExecute() error {
reducedHitsBs, err := proto.Marshal(reducedHits)
if err != nil {
log.Println("marshal error")
return err
qt.result.Hits = append(qt.result.Hits, reducedHitsBs)
return nil
@ -703,10 +637,7 @@ func (dct *DescribeCollectionTask) PreExecute() error {
func (dct *DescribeCollectionTask) Execute() error {
var err error
dct.result, err = dct.masterClient.DescribeCollection(dct.ctx, &dct.DescribeCollectionRequest)
if err != nil {
return err
err = globalMetaCache.Update(dct.CollectionName.CollectionName, dct.result)
globalMetaCache.Update(dct.CollectionName.CollectionName, dct.result)
return err
@ -48,6 +48,7 @@ func (dsService *dataSyncService) initNodes() {
var insertNode Node = newInsertNode(dsService.replica)
var serviceTimeNode Node = newServiceTimeNode(dsService.replica)
var gcNode Node = newGCNode(dsService.replica)
@ -57,6 +58,7 @@ func (dsService *dataSyncService) initNodes() {
// dmStreamNode
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
@ -106,9 +108,17 @@ func (dsService *dataSyncService) initNodes() {
// serviceTimeNode
err = dsService.fg.SetEdges(serviceTimeNode.Name(),
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
// gcNode
err = dsService.fg.SetEdges(gcNode.Name(),
if err != nil {
log.Fatal("set edges failed in node:", gcNode.Name())
@ -44,6 +44,11 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
ddNode.ddMsg = &ddMsg
gcRecord := gcRecord{
collections: make([]UniqueID, 0),
partitions: make([]partitionWithID, 0),
ddNode.ddMsg.gcRecord = &gcRecord
// sort tsMessages
tsMessages := msMsg.TsMessages()
@ -115,11 +120,11 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
collectionID := msg.CollectionID
err := ddNode.replica.removeCollection(collectionID)
if err != nil {
//err := ddNode.replica.removeCollection(collectionID)
//if err != nil {
// log.Println(err)
// return
collectionName := msg.CollectionName.CollectionName
ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName],
@ -127,6 +132,8 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
createOrDrop: false,
timestamp: msg.Timestamp,
ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID)
func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
@ -150,17 +157,22 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
collectionID := msg.CollectionID
partitionTag := msg.PartitionName.Tag
err := ddNode.replica.removePartition(collectionID, partitionTag)
if err != nil {
//err := ddNode.replica.removePartition(collectionID, partitionTag)
//if err != nil {
// log.Println(err)
// return
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag],
createOrDrop: false,
timestamp: msg.Timestamp,
ddNode.ddMsg.gcRecord.partitions = append(ddNode.ddMsg.gcRecord.partitions, partitionWithID{
partitionTag: partitionTag,
collectionID: collectionID,
func newDDNode(replica collectionReplica) *ddNode {
@ -2,6 +2,7 @@ package querynode
import (
@ -59,6 +60,7 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
return []*Msg{&res}
@ -81,17 +83,35 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
log.Println("Error, misaligned messages detected")
return nil
tmpTimestamps := make([]Timestamp, 0)
tmpRowIDs := make([]int64, 0)
tmpRowData := make([]*commonpb.Blob, 0)
targetTimestamp := records[len(records)-1].timestamp
// calculate valid time range
timeBegin := Timestamp(0)
timeEnd := Timestamp(math.MaxUint64)
for _, record := range records {
if record.createOrDrop && timeBegin < record.timestamp {
timeBegin = record.timestamp
if !record.createOrDrop && timeEnd > record.timestamp {
timeEnd = record.timestamp
for i, t := range msg.Timestamps {
if t >= targetTimestamp {
if t >= timeBegin && t <= timeEnd {
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
if len(tmpRowIDs) <= 0 {
return nil
msg.Timestamps = tmpTimestamps
msg.RowIDs = tmpRowIDs
msg.RowData = tmpRowData
Normal file
Normal file
@ -0,0 +1,61 @@
package querynode
import (
type gcNode struct {
replica collectionReplica
func (gcNode *gcNode) Name() string {
return "gcNode"
func (gcNode *gcNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do gcNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in gcNode, input length = ", len(in))
// TODO: add error handling
gcMsg, ok := (*in[0]).(*gcMsg)
if !ok {
log.Println("type assertion failed for gcMsg")
// TODO: add error handling
// drop collections
for _, collectionID := range gcMsg.gcRecord.collections {
err := gcNode.replica.removeCollection(collectionID)
if err != nil {
// drop partitions
for _, partition := range gcMsg.gcRecord.partitions {
err := gcNode.replica.removePartition(partition.collectionID, partition.partitionTag)
if err != nil {
return nil
func newGCNode(replica collectionReplica) *gcNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
return &gcNode{
BaseNode: baseNode,
replica: replica,
@ -90,6 +90,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
var res Msg = &serviceTimeMsg{
gcRecord: iMsg.gcRecord,
timeRange: iMsg.timeRange,
return []*Msg{&res}
@ -16,6 +16,7 @@ type key2SegMsg struct {
type ddMsg struct {
collectionRecords map[string][]metaOperateRecord
partitionRecords map[string][]metaOperateRecord
gcRecord *gcRecord
timeRange TimeRange
@ -26,6 +27,7 @@ type metaOperateRecord struct {
type insertMsg struct {
insertMessages []*msgstream.InsertMsg
gcRecord *gcRecord
timeRange TimeRange
@ -35,6 +37,12 @@ type deleteMsg struct {
type serviceTimeMsg struct {
gcRecord *gcRecord
timeRange TimeRange
type gcMsg struct {
gcRecord *gcRecord
timeRange TimeRange
@ -55,42 +63,39 @@ type DeletePreprocessData struct {
count int32
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
// TODO: replace partitionWithID by partition id
type partitionWithID struct {
partitionTag string
collectionID UniqueID
func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
type gcRecord struct {
// collections and partitions to be dropped
collections []UniqueID
// TODO: use partition id
partitions []partitionWithID
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
func (suMsg *ddMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
func (suMsg *ddMsg) DownStreamNodeIdx() int {
return 0
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
func (iMsg *insertMsg) DownStreamNodeIdx() int {
return 0
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int {
return 0
func (gcMsg *gcMsg) TimeTick() Timestamp {
return gcMsg.timeRange.timestampMax
@ -30,7 +30,12 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
// update service time
//fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax))
return nil
var res Msg = &gcMsg{
gcRecord: serviceTimeMsg.gcRecord,
timeRange: serviceTimeMsg.timeRange,
return []*Msg{&res}
func newServiceTimeNode(replica collectionReplica) *serviceTimeNode {
@ -1,12 +1,8 @@
package flowgraph
import (
@ -29,31 +25,10 @@ func (inNode *InputNode) InStream() *msgstream.MsgStream {
// empty input and return one *Msg
func (inNode *InputNode) Operate([]*Msg) []*Msg {
func (inNode *InputNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do InputNode operation")
msgPack := (*inNode.inStream).Consume()
var childs []opentracing.Span
tracer := opentracing.GlobalTracer()
if tracer != nil && msgPack != nil {
for _, msg := range msgPack.Msgs {
if msg.Type() == internalpb.MsgType_kInsert || msg.Type() == internalpb.MsgType_kSearch {
var child opentracing.Span
ctx := msg.GetContext()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()),
} else {
child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()))
child.SetTag("hash keys", msg.HashKeys())
child.SetTag("start time", msg.BeginTs())
child.SetTag("end time", msg.EndTs())
msg.SetContext(opentracing.ContextWithSpan(ctx, child))
childs = append(childs, child)
msgPack := (*inNode.inStream).Consume()
// TODO: add status
if msgPack == nil {
@ -67,10 +42,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg {
timestampMax: msgPack.EndTs,
for _, child := range childs {
return []*Msg{&msgStreamMsg}
@ -4,7 +4,6 @@ import "github.com/zilliztech/milvus-distributed/internal/msgstream"
type Msg interface {
TimeTick() Timestamp
DownStreamNodeIdx() int
type MsgStreamMsg struct {
@ -1,6 +1,7 @@
package writenode
import (
@ -42,6 +43,7 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
fmt.Println("drop collection:", collectionID)
defer colReplica.mu.Unlock()
@ -50,6 +50,7 @@ func (dsService *dataSyncService) initNodes() {
var ddNode Node = newDDNode(dsService.ctx, dsService.ddChan, dsService.replica)
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.insertChan, dsService.replica)
var gcNode Node = newGCNode(dsService.replica)
@ -58,6 +59,7 @@ func (dsService *dataSyncService) initNodes() {
// dmStreamNode
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
@ -98,9 +100,17 @@ func (dsService *dataSyncService) initNodes() {
// insertBufferNode
err = dsService.fg.SetEdges(insertBufferNode.Name(),
if err != nil {
log.Fatal("set edges failed in node:", insertBufferNode.Name())
// gcNode
err = dsService.fg.SetEdges(gcNode.Name(),
if err != nil {
log.Fatal("set edges failed in node:", gcNode.Name())
@ -91,6 +91,11 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
ddNode.ddMsg = &ddMsg
gcRecord := gcRecord{
collections: make([]UniqueID, 0),
ddNode.ddMsg.gcRecord = &gcRecord
// sort tsMessages
tsMessages := msMsg.TsMessages()
@ -259,10 +264,10 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
collectionID := msg.CollectionID
err := ddNode.replica.removeCollection(collectionID)
if err != nil {
//err := ddNode.replica.removeCollection(collectionID)
//if err != nil {
// log.Println(err)
// remove collection
if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; !ok {
@ -291,6 +296,8 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropCollectionRequest.String())
ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Timestamp)
ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropCollectionEventType)
ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID)
func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
@ -1,10 +1,8 @@
package writenode
import (
@ -34,34 +32,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
var childs []opentracing.Span
tracer := opentracing.GlobalTracer()
if tracer != nil {
for _, msg := range msgStreamMsg.TsMessages() {
if msg.Type() == internalPb.MsgType_kInsert {
var child opentracing.Span
ctx := msg.GetContext()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("pass filter node",
} else {
child = tracer.StartSpan("pass filter node")
child.SetTag("hash keys", msg.HashKeys())
child.SetTag("start time", msg.BeginTs())
child.SetTag("end time", msg.EndTs())
msg.SetContext(opentracing.ContextWithSpan(ctx, child))
childs = append(childs, child)
ddMsg, ok := (*in[1]).(*ddMsg)
if !ok {
log.Println("type assertion failed for ddMsg")
// TODO: add error handling
fdmNode.ddMsg = ddMsg
var iMsg = insertMsg{
@ -82,20 +57,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
for key, msg := range msgStreamMsg.TsMessages() {
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case internalPb.MsgType_kInsert:
var ctx2 context.Context
if childs != nil {
if childs[key] != nil {
ctx2 = opentracing.ContextWithSpan(msg.GetContext(), childs[key])
} else {
ctx2 = context.Background()
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
// case internalPb.MsgType_kDelete:
@ -104,11 +70,9 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
log.Println("Non supporting message type:", msg.Type())
var res Msg = &iMsg
for _, child := range childs {
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
return []*Msg{&res}
@ -133,14 +97,31 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
tmpTimestamps := make([]Timestamp, 0)
tmpRowIDs := make([]int64, 0)
tmpRowData := make([]*commonpb.Blob, 0)
targetTimestamp := records[len(records)-1].timestamp
// calculate valid time range
timeBegin := Timestamp(0)
timeEnd := Timestamp(math.MaxUint64)
for _, record := range records {
if record.createOrDrop && timeBegin < record.timestamp {
timeBegin = record.timestamp
if !record.createOrDrop && timeEnd > record.timestamp {
timeEnd = record.timestamp
for i, t := range msg.Timestamps {
if t >= targetTimestamp {
if t >= timeBegin && t <= timeEnd {
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
if len(tmpRowIDs) <= 0 {
return nil
msg.Timestamps = tmpTimestamps
msg.RowIDs = tmpRowIDs
msg.RowData = tmpRowData
Normal file
Normal file
@ -0,0 +1,53 @@
package writenode
import (
type gcNode struct {
replica collectionReplica
func (gcNode *gcNode) Name() string {
return "gcNode"
func (gcNode *gcNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do gcNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in gcNode, input length = ", len(in))
// TODO: add error handling
gcMsg, ok := (*in[0]).(*gcMsg)
if !ok {
log.Println("type assertion failed for gcMsg")
// TODO: add error handling
// drop collections
for _, collectionID := range gcMsg.gcRecord.collections {
err := gcNode.replica.removeCollection(collectionID)
if err != nil {
return nil
func newGCNode(replica collectionReplica) *gcNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
return &gcNode{
BaseNode: baseNode,
replica: replica,
@ -4,15 +4,11 @@ import (
oplog "github.com/opentracing/opentracing-go/log"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
@ -100,23 +96,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// iMsg is insertMsg
// 1. iMsg -> buffer
for _, msg := range iMsg.insertMessages {
ctx := msg.GetContext()
var span opentracing.Span
if ctx != nil {
span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
} else {
span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
span.SetTag("hash keys", msg.HashKeys())
span.SetTag("start time", msg.BeginTs())
span.SetTag("end time", msg.EndTs())
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Println("Error: misaligned messages detected")
currentSegID := msg.GetSegmentID()
collectionName := msg.GetCollectionName()
span.LogFields(oplog.Int("segment id", int(currentSegID)))
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
@ -125,21 +110,6 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// Timestamps
_, ok = idata.Data[1].(*storage.Int64FieldData)
if !ok {
idata.Data[1] = &storage.Int64FieldData{
Data: []int64{},
NumRows: 0,
tsData := idata.Data[1].(*storage.Int64FieldData)
for _, ts := range msg.Timestamps {
tsData.Data = append(tsData.Data, int64(ts))
tsData.NumRows += len(msg.Timestamps)
span.LogFields(oplog.Int("tsData numRows", tsData.NumRows))
// 1.1 Get CollectionMeta from etcd
collection, err := ibNode.replica.getCollectionByName(collectionName)
//collSchema, err := ibNode.getCollectionSchemaByName(collectionName)
@ -388,11 +358,9 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
span.LogFields(oplog.String("store in buffer", "store in buffer"))
// 1.4 if full
// 1.4.1 generate binlogs
span.LogFields(oplog.String("generate binlogs", "generate binlogs"))
if ibNode.insertBuffer.full(currentSegID) {
log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID))
// partitionTag -> partitionID
@ -461,7 +429,6 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
ibNode.outCh <- inBinlogMsg
if len(iMsg.insertMessages) > 0 {
@ -572,7 +539,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
log.Printf("Error: send hard time tick into pulsar channel failed, %s\n", err.Error())
return nil
var res Msg = &gcMsg{
gcRecord: iMsg.gcRecord,
timeRange: iMsg.timeRange,
return []*Msg{&res}
func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {
@ -22,6 +22,7 @@ type (
// TODO: use partition id
partitionRecords map[string][]metaOperateRecord
flushMessages []*msgstream.FlushMsg
gcRecord *gcRecord
timeRange TimeRange
@ -33,6 +34,7 @@ type (
insertMsg struct {
insertMessages []*msgstream.InsertMsg
flushMessages []*msgstream.FlushMsg
gcRecord *gcRecord
timeRange TimeRange
@ -40,36 +42,33 @@ type (
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
gcMsg struct {
gcRecord *gcRecord
timeRange TimeRange
gcRecord struct {
collections []UniqueID
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
func (suMsg *ddMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
func (suMsg *ddMsg) DownStreamNodeIdx() int {
return 0
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
func (iMsg *insertMsg) DownStreamNodeIdx() int {
return 0
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
func (gcMsg *gcMsg) TimeTick() Timestamp {
return gcMsg.timeRange.timestampMax
@ -2,12 +2,6 @@ package writenode
import (
type WriteNode struct {
@ -17,8 +11,6 @@ type WriteNode struct {
flushSyncService *flushSyncService
metaService *metaService
replica collectionReplica
tracer opentracing.Tracer
closer io.Closer
func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
@ -46,22 +38,6 @@ func Init() {
func (node *WriteNode) Start() error {
cfg := &config.Configuration{
ServiceName: "tracing",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
Reporter: &config.ReporterConfig{
LogSpans: true,
var err error
node.tracer, node.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
// TODO GOOSE Init Size??
chanSize := 100
Reference in New Issue
Block a user