mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 19:39:21 +08:00
Fix msgstream deadlock when loadCollection
Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
parent
563a8d66ae
commit
2ca53fa668
51
internal/core/src/query/ScalarIndex.h
Normal file
51
internal/core/src/query/ScalarIndex.h
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
#include "knowhere/index/structured_index_simple/StructuredIndexSort.h"
|
||||
#include "common/Span.h"
|
||||
#include "common/FieldMeta.h"
|
||||
#include <memory>
|
||||
|
||||
namespace milvus::query {
|
||||
|
||||
template <typename T>
|
||||
inline std::unique_ptr<knowhere::scalar::StructuredIndex<T>>
|
||||
generate_scalar_index(Span<T> data) {
|
||||
auto indexing = std::make_unique<knowhere::scalar::StructuredIndexSort<T>>();
|
||||
indexing->Build(data.row_count(), data.data());
|
||||
return indexing;
|
||||
}
|
||||
|
||||
inline std::unique_ptr<knowhere::Index>
|
||||
generate_scalar_index(SpanBase data, DataType data_type) {
|
||||
Assert(!datatype_is_vector(data_type));
|
||||
switch (data_type) {
|
||||
case DataType::BOOL:
|
||||
return generate_scalar_index(Span<bool>(data));
|
||||
case DataType::INT8:
|
||||
return generate_scalar_index(Span<int8_t>(data));
|
||||
case DataType::INT16:
|
||||
return generate_scalar_index(Span<int16_t>(data));
|
||||
case DataType::INT32:
|
||||
return generate_scalar_index(Span<int32_t>(data));
|
||||
case DataType::INT64:
|
||||
return generate_scalar_index(Span<int64_t>(data));
|
||||
case DataType::FLOAT:
|
||||
return generate_scalar_index(Span<float>(data));
|
||||
case DataType::DOUBLE:
|
||||
return generate_scalar_index(Span<double>(data));
|
||||
default:
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace milvus::query
|
@ -99,8 +99,8 @@ BinarySearchBruteForceFast(MetricType metric_type,
|
||||
}
|
||||
|
||||
SubQueryResult
|
||||
FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset,
|
||||
const float* chunk_data,
|
||||
FloatSearchBruteForce(const dataset::QueryDataset& query_dataset,
|
||||
const void* chunk_data_raw,
|
||||
int64_t size_per_chunk,
|
||||
const faiss::BitsetView& bitset) {
|
||||
auto metric_type = query_dataset.metric_type;
|
||||
@ -108,25 +108,29 @@ FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset,
|
||||
auto topk = query_dataset.topk;
|
||||
auto dim = query_dataset.dim;
|
||||
SubQueryResult sub_qr(num_queries, topk, metric_type);
|
||||
auto query_data = reinterpret_cast<const float*>(query_dataset.query_data);
|
||||
auto chunk_data = reinterpret_cast<const float*>(chunk_data_raw);
|
||||
|
||||
if (metric_type == MetricType::METRIC_L2) {
|
||||
faiss::float_maxheap_array_t buf{(size_t)num_queries, (size_t)topk, sub_qr.get_labels(), sub_qr.get_values()};
|
||||
faiss::knn_L2sqr(query_dataset.query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset);
|
||||
faiss::knn_L2sqr(query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset);
|
||||
return sub_qr;
|
||||
} else {
|
||||
faiss::float_minheap_array_t buf{(size_t)num_queries, (size_t)topk, sub_qr.get_labels(), sub_qr.get_values()};
|
||||
faiss::knn_inner_product(query_dataset.query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset);
|
||||
faiss::knn_inner_product(query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset);
|
||||
return sub_qr;
|
||||
}
|
||||
}
|
||||
|
||||
SubQueryResult
|
||||
BinarySearchBruteForce(const dataset::BinaryQueryDataset& query_dataset,
|
||||
const uint8_t* binary_chunk,
|
||||
BinarySearchBruteForce(const dataset::QueryDataset& query_dataset,
|
||||
const void* chunk_data_raw,
|
||||
int64_t size_per_chunk,
|
||||
const faiss::BitsetView& bitset) {
|
||||
// TODO: refactor the internal function
|
||||
return BinarySearchBruteForceFast(query_dataset.metric_type, query_dataset.dim, binary_chunk, size_per_chunk,
|
||||
query_dataset.topk, query_dataset.num_queries, query_dataset.query_data, bitset);
|
||||
auto query_data = reinterpret_cast<const uint8_t*>(query_dataset.query_data);
|
||||
auto chunk_data = reinterpret_cast<const uint8_t*>(chunk_data_raw);
|
||||
return BinarySearchBruteForceFast(query_dataset.metric_type, query_dataset.dim, chunk_data, size_per_chunk,
|
||||
query_dataset.topk, query_dataset.num_queries, query_data, bitset);
|
||||
}
|
||||
} // namespace milvus::query
|
||||
|
@ -19,14 +19,14 @@
|
||||
namespace milvus::query {
|
||||
|
||||
SubQueryResult
|
||||
BinarySearchBruteForce(const dataset::BinaryQueryDataset& query_dataset,
|
||||
const uint8_t* binary_chunk,
|
||||
BinarySearchBruteForce(const dataset::QueryDataset& query_dataset,
|
||||
const void* chunk_data_raw,
|
||||
int64_t size_per_chunk,
|
||||
const faiss::BitsetView& bitset);
|
||||
|
||||
SubQueryResult
|
||||
FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset,
|
||||
const float* chunk_data,
|
||||
FloatSearchBruteForce(const dataset::QueryDataset& query_dataset,
|
||||
const void* chunk_data_raw,
|
||||
int64_t size_per_chunk,
|
||||
const faiss::BitsetView& bitset);
|
||||
|
||||
|
@ -69,7 +69,7 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
||||
// std::vector<int64_t> final_uids(total_count, -1);
|
||||
// std::vector<float> final_dis(total_count, std::numeric_limits<float>::max());
|
||||
SubQueryResult final_qr(num_queries, topK, metric_type);
|
||||
dataset::FloatQueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data};
|
||||
dataset::QueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data};
|
||||
|
||||
auto max_indexed_id = indexing_record.get_finished_ack();
|
||||
const auto& field_indexing = indexing_record.get_vec_field_indexing(vecfield_offset);
|
||||
@ -158,7 +158,7 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment,
|
||||
auto total_count = topK * num_queries;
|
||||
|
||||
// step 3: small indexing search
|
||||
query::dataset::BinaryQueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data};
|
||||
query::dataset::QueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data};
|
||||
|
||||
auto vec_ptr = record.get_field_data<BinaryVector>(vecfield_offset);
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
#include "SearchOnIndex.h"
|
||||
namespace milvus::query {
|
||||
SubQueryResult
|
||||
SearchOnIndex(const dataset::FloatQueryDataset& query_dataset,
|
||||
SearchOnIndex(const dataset::QueryDataset& query_dataset,
|
||||
const knowhere::VecIndex& indexing,
|
||||
const knowhere::Config& search_conf,
|
||||
const faiss::BitsetView& bitset) {
|
||||
|
@ -19,7 +19,7 @@
|
||||
|
||||
namespace milvus::query {
|
||||
SubQueryResult
|
||||
SearchOnIndex(const dataset::FloatQueryDataset& query_dataset,
|
||||
SearchOnIndex(const dataset::QueryDataset& query_dataset,
|
||||
const knowhere::VecIndex& indexing,
|
||||
const knowhere::Config& search_conf,
|
||||
const faiss::BitsetView& bitset);
|
||||
|
@ -15,20 +15,12 @@
|
||||
namespace milvus::query {
|
||||
namespace dataset {
|
||||
|
||||
struct FloatQueryDataset {
|
||||
struct QueryDataset {
|
||||
MetricType metric_type;
|
||||
int64_t num_queries;
|
||||
int64_t topk;
|
||||
int64_t dim;
|
||||
const float* query_data;
|
||||
};
|
||||
|
||||
struct BinaryQueryDataset {
|
||||
MetricType metric_type;
|
||||
int64_t num_queries;
|
||||
int64_t topk;
|
||||
int64_t dim;
|
||||
const uint8_t* query_data;
|
||||
const void* query_data;
|
||||
};
|
||||
|
||||
} // namespace dataset
|
||||
|
@ -80,7 +80,7 @@ class VectorBase {
|
||||
grow_to_at_least(int64_t element_count) = 0;
|
||||
|
||||
virtual void
|
||||
set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) = 0;
|
||||
set_data_raw(ssize_t element_offset, const void* source, ssize_t element_count) = 0;
|
||||
|
||||
virtual SpanBase
|
||||
get_span_base(int64_t chunk_id) const = 0;
|
||||
@ -142,7 +142,7 @@ class ConcurrentVectorImpl : public VectorBase {
|
||||
}
|
||||
|
||||
void
|
||||
set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) override {
|
||||
set_data_raw(ssize_t element_offset, const void* source, ssize_t element_count) override {
|
||||
set_data(element_offset, static_cast<const Type*>(source), element_count);
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,11 @@ struct RowBasedRawData {
|
||||
int64_t count;
|
||||
};
|
||||
|
||||
struct ColumnBasedRawData {
|
||||
std::vector<aligned_vector<uint8_t>> columns_;
|
||||
int64_t count;
|
||||
};
|
||||
|
||||
int
|
||||
TestABI();
|
||||
|
||||
@ -55,6 +60,13 @@ class SegmentGrowing : public SegmentInternalInterface {
|
||||
const Timestamp* timestamps,
|
||||
const RowBasedRawData& values) = 0;
|
||||
|
||||
virtual void
|
||||
Insert(int64_t reserved_offset,
|
||||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const ColumnBasedRawData& values) = 0;
|
||||
|
||||
virtual int64_t
|
||||
PreDelete(int64_t size) = 0;
|
||||
|
||||
|
@ -142,7 +142,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_begin,
|
||||
auto sizeof_infos = schema_->get_sizeof_infos();
|
||||
std::vector<int> offset_infos(schema_->size() + 1, 0);
|
||||
std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1);
|
||||
std::vector<std::vector<char>> entities(schema_->size());
|
||||
std::vector<aligned_vector<uint8_t>> entities(schema_->size());
|
||||
|
||||
for (int fid = 0; fid < schema_->size(); ++fid) {
|
||||
auto len = sizeof_infos[fid];
|
||||
@ -165,23 +165,32 @@ SegmentGrowingImpl::Insert(int64_t reserved_begin,
|
||||
}
|
||||
}
|
||||
|
||||
do_insert(reserved_begin, size, uids.data(), timestamps.data(), entities);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void
|
||||
SegmentGrowingImpl::do_insert(int64_t reserved_begin,
|
||||
int64_t size,
|
||||
const idx_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const std::vector<aligned_vector<uint8_t>>& columns_data) {
|
||||
// step 4: fill into Segment.ConcurrentVector
|
||||
record_.timestamps_.set_data(reserved_begin, timestamps.data(), size);
|
||||
record_.uids_.set_data(reserved_begin, uids.data(), size);
|
||||
record_.timestamps_.set_data(reserved_begin, timestamps, size);
|
||||
record_.uids_.set_data(reserved_begin, row_ids, size);
|
||||
for (int fid = 0; fid < schema_->size(); ++fid) {
|
||||
auto field_offset = FieldOffset(fid);
|
||||
record_.get_field_data_base(field_offset)->set_data_raw(reserved_begin, entities[fid].data(), size);
|
||||
record_.get_field_data_base(field_offset)->set_data_raw(reserved_begin, columns_data[fid].data(), size);
|
||||
}
|
||||
|
||||
for (int i = 0; i < uids.size(); ++i) {
|
||||
auto uid = uids[i];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto row_id = row_ids[i];
|
||||
// NOTE: this must be the last step, cannot be put above
|
||||
uid2offset_.insert(std::make_pair(uid, reserved_begin + i));
|
||||
uid2offset_.insert(std::make_pair(row_id, reserved_begin + i));
|
||||
}
|
||||
|
||||
record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
|
||||
indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
@ -274,5 +283,167 @@ SegmentGrowingImpl::vector_search(int64_t vec_count,
|
||||
SearchOnGrowing(*this, vec_count, query_info, query_data, query_count, bitset, output);
|
||||
}
|
||||
}
|
||||
void
|
||||
SegmentGrowingImpl::bulk_subscript(FieldOffset field_offset,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output) const {
|
||||
// TODO: support more types
|
||||
auto vec_ptr = record_.get_field_data_base(field_offset);
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
if (field_meta.is_vector()) {
|
||||
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
bulk_subscript_impl<FloatVector>(field_meta.get_sizeof(), *vec_ptr, seg_offsets, count, output);
|
||||
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
|
||||
bulk_subscript_impl<BinaryVector>(field_meta.get_sizeof(), *vec_ptr, seg_offsets, count, output);
|
||||
} else {
|
||||
PanicInfo("logical error");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Assert(!field_meta.is_vector());
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::BOOL: {
|
||||
bulk_subscript_impl<bool>(*vec_ptr, seg_offsets, count, false, output);
|
||||
break;
|
||||
}
|
||||
case DataType::INT8: {
|
||||
bulk_subscript_impl<int8_t>(*vec_ptr, seg_offsets, count, 0, output);
|
||||
break;
|
||||
}
|
||||
|
||||
case DataType::INT16: {
|
||||
bulk_subscript_impl<int16_t>(*vec_ptr, seg_offsets, count, 0, output);
|
||||
break;
|
||||
}
|
||||
|
||||
case DataType::INT32: {
|
||||
bulk_subscript_impl<int32_t>(*vec_ptr, seg_offsets, count, 0, output);
|
||||
break;
|
||||
}
|
||||
|
||||
case DataType::INT64: {
|
||||
bulk_subscript_impl<int64_t>(*vec_ptr, seg_offsets, count, 0, output);
|
||||
break;
|
||||
}
|
||||
|
||||
case DataType::FLOAT: {
|
||||
bulk_subscript_impl<float>(*vec_ptr, seg_offsets, count, 0, output);
|
||||
break;
|
||||
}
|
||||
|
||||
case DataType::DOUBLE: {
|
||||
bulk_subscript_impl<double>(*vec_ptr, seg_offsets, count, 0, output);
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
SegmentGrowingImpl::bulk_subscript_impl(int64_t element_sizeof,
|
||||
const VectorBase& vec_raw,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output_raw) const {
|
||||
static_assert(IsVector<T>);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(&vec_raw);
|
||||
Assert(vec_ptr);
|
||||
auto& vec = *vec_ptr;
|
||||
std::vector<uint8_t> empty(element_sizeof, 0);
|
||||
auto output_base = reinterpret_cast<char*>(output_raw);
|
||||
for (int i = 0; i < count; ++i) {
|
||||
auto dst = output_base + i * element_sizeof;
|
||||
auto offset = seg_offsets[i];
|
||||
const uint8_t* src = offset == -1 ? empty.data() : (const uint8_t*)vec.get_element(offset);
|
||||
memcpy(dst, src, element_sizeof);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
SegmentGrowingImpl::bulk_subscript_impl(
|
||||
const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, T default_value, void* output_raw) const {
|
||||
static_assert(IsScalar<T>);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(&vec_raw);
|
||||
Assert(vec_ptr);
|
||||
auto& vec = *vec_ptr;
|
||||
auto output = reinterpret_cast<T*>(output_raw);
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
output[i] = offset == -1 ? default_value : vec[offset];
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output) const {
|
||||
switch (system_type) {
|
||||
case SystemFieldType::Timestamp:
|
||||
PanicInfo("timestamp unsupported");
|
||||
case SystemFieldType::RowId:
|
||||
bulk_subscript_impl<int64_t>(this->record_.uids_, seg_offsets, count, -1, output);
|
||||
break;
|
||||
default:
|
||||
PanicInfo("unknown subscript fields");
|
||||
}
|
||||
}
|
||||
|
||||
// copied from stack overflow
|
||||
template <typename T>
|
||||
std::vector<size_t>
|
||||
sort_indexes(const T* src, int64_t size) {
|
||||
// initialize original index locations
|
||||
std::vector<size_t> idx(size);
|
||||
iota(idx.begin(), idx.end(), 0);
|
||||
|
||||
// sort indexes based on comparing values in v
|
||||
// using std::stable_sort instead of std::sort
|
||||
// to avoid unnecessary index re-orderings
|
||||
// when v contains elements of equal values
|
||||
std::stable_sort(idx.begin(), idx.end(), [src](size_t i1, size_t i2) { return src[i1] < src[i2]; });
|
||||
|
||||
return idx;
|
||||
}
|
||||
|
||||
void
|
||||
SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
int64_t size,
|
||||
const int64_t* row_ids_raw,
|
||||
const Timestamp* timestamps_raw,
|
||||
const ColumnBasedRawData& values) {
|
||||
auto indexes = sort_indexes(timestamps_raw, size);
|
||||
std::vector<Timestamp> timestamps(size);
|
||||
std::vector<idx_t> row_ids(size);
|
||||
Assert(values.count == size);
|
||||
for (int64_t i = 0; i < size; ++i) {
|
||||
auto offset = indexes[i];
|
||||
timestamps[i] = timestamps_raw[offset];
|
||||
row_ids[i] = row_ids_raw[i];
|
||||
}
|
||||
std::vector<aligned_vector<uint8_t>> columns_data;
|
||||
|
||||
for (int field_offset = 0; field_offset < schema_->size(); ++field_offset) {
|
||||
auto& field_meta = schema_->operator[](FieldOffset(field_offset));
|
||||
aligned_vector<uint8_t> column;
|
||||
auto element_sizeof = field_meta.get_sizeof();
|
||||
auto& src_vec = values.columns_[field_offset];
|
||||
Assert(src_vec.size() == element_sizeof * size);
|
||||
for (int64_t i = 0; i < size; ++i) {
|
||||
auto offset = indexes[i];
|
||||
auto beg = src_vec.data() + offset * element_sizeof;
|
||||
column.insert(column.end(), beg, beg + element_sizeof);
|
||||
}
|
||||
columns_data.emplace_back(std::move(column));
|
||||
}
|
||||
do_insert(reserved_offset, size, row_ids.data(), timestamps.data(), columns_data);
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "InsertRecord.h"
|
||||
#include <utility>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
@ -46,6 +47,13 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
const Timestamp* timestamps,
|
||||
const RowBasedRawData& values) override;
|
||||
|
||||
void
|
||||
Insert(int64_t reserved_offset,
|
||||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const ColumnBasedRawData& values) override;
|
||||
|
||||
int64_t
|
||||
PreDelete(int64_t size) override;
|
||||
|
||||
@ -119,44 +127,25 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// for scalar vectors
|
||||
template <typename T>
|
||||
void
|
||||
bulk_subscript_impl(const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, void* output_raw) const {
|
||||
static_assert(IsScalar<T>);
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(&vec_raw);
|
||||
Assert(vec_ptr);
|
||||
auto& vec = *vec_ptr;
|
||||
auto output = reinterpret_cast<T*>(output_raw);
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
output[i] = offset == -1 ? -1 : vec[offset];
|
||||
}
|
||||
}
|
||||
bulk_subscript_impl(
|
||||
const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, T default_value, void* output_raw) const;
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
bulk_subscript_impl(int64_t element_sizeof,
|
||||
const VectorBase& vec_raw,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output_raw) const;
|
||||
|
||||
void
|
||||
bulk_subscript(SystemFieldType system_type,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output) const override {
|
||||
switch (system_type) {
|
||||
case SystemFieldType::Timestamp:
|
||||
PanicInfo("timestamp unsupported");
|
||||
case SystemFieldType::RowId:
|
||||
bulk_subscript_impl<int64_t>(this->record_.uids_, seg_offsets, count, output);
|
||||
break;
|
||||
default:
|
||||
PanicInfo("unknown subscript fields");
|
||||
}
|
||||
}
|
||||
bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const override;
|
||||
|
||||
void
|
||||
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override {
|
||||
// TODO: support more types
|
||||
auto vec_ptr = record_.get_field_data_base(field_offset);
|
||||
auto data_type = schema_->operator[](field_offset).get_data_type();
|
||||
Assert(data_type == DataType::INT64);
|
||||
bulk_subscript_impl<int64_t>(*vec_ptr, seg_offsets, count, output);
|
||||
}
|
||||
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override;
|
||||
|
||||
Status
|
||||
LoadIndexing(const LoadIndexInfo& info) override;
|
||||
@ -196,6 +185,14 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
Assert(plan);
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
do_insert(int64_t reserved_begin,
|
||||
int64_t size,
|
||||
const idx_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const std::vector<aligned_vector<uint8_t>>& columns_data);
|
||||
|
||||
private:
|
||||
int64_t size_per_chunk_;
|
||||
SchemaPtr schema_;
|
||||
|
@ -23,22 +23,51 @@ SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, QueryResult&
|
||||
Assert(results.result_offsets_.size() == size);
|
||||
Assert(results.row_data_.size() == 0);
|
||||
|
||||
std::vector<int64_t> target(size);
|
||||
if (plan->schema_.get_is_auto_id()) {
|
||||
// use row_id
|
||||
bulk_subscript(SystemFieldType::RowId, results.internal_seg_offsets_.data(), size, target.data());
|
||||
} else {
|
||||
auto key_offset_opt = get_schema().get_primary_key_offset();
|
||||
Assert(key_offset_opt.has_value());
|
||||
auto key_offset = key_offset_opt.value();
|
||||
bulk_subscript(key_offset, results.internal_seg_offsets_.data(), size, target.data());
|
||||
// std::vector<int64_t> row_ids(size);
|
||||
std::vector<int64_t> element_sizeofs;
|
||||
std::vector<aligned_vector<char>> blobs;
|
||||
|
||||
// fill row_ids
|
||||
{
|
||||
aligned_vector<char> blob(size * sizeof(int64_t));
|
||||
if (plan->schema_.get_is_auto_id()) {
|
||||
bulk_subscript(SystemFieldType::RowId, results.internal_seg_offsets_.data(), size, blob.data());
|
||||
} else {
|
||||
auto key_offset_opt = get_schema().get_primary_key_offset();
|
||||
Assert(key_offset_opt.has_value());
|
||||
auto key_offset = key_offset_opt.value();
|
||||
Assert(get_schema()[key_offset].get_data_type() == DataType::INT64);
|
||||
bulk_subscript(key_offset, results.internal_seg_offsets_.data(), size, blob.data());
|
||||
}
|
||||
blobs.emplace_back(std::move(blob));
|
||||
element_sizeofs.push_back(sizeof(int64_t));
|
||||
}
|
||||
|
||||
// fill other entries
|
||||
for (auto field_offset : plan->target_entries_) {
|
||||
auto& field_meta = get_schema()[field_offset];
|
||||
auto element_sizeof = field_meta.get_sizeof();
|
||||
aligned_vector<char> blob(size * element_sizeof);
|
||||
bulk_subscript(field_offset, results.internal_seg_offsets_.data(), size, blob.data());
|
||||
blobs.emplace_back(std::move(blob));
|
||||
element_sizeofs.push_back(element_sizeof);
|
||||
}
|
||||
|
||||
auto target_sizeof = std::accumulate(element_sizeofs.begin(), element_sizeofs.end(), 0);
|
||||
|
||||
for (int64_t i = 0; i < size; ++i) {
|
||||
auto row_id = target[i];
|
||||
std::vector<char> blob(sizeof(row_id));
|
||||
memcpy(blob.data(), &row_id, sizeof(row_id));
|
||||
results.row_data_.emplace_back(std::move(blob));
|
||||
int64_t element_offset = 0;
|
||||
std::vector<char> target(target_sizeof);
|
||||
for (int loc = 0; loc < blobs.size(); ++loc) {
|
||||
auto element_sizeof = element_sizeofs[loc];
|
||||
auto blob_ptr = blobs[loc].data();
|
||||
auto src = blob_ptr + element_sizeof * i;
|
||||
auto dst = target.data() + element_offset;
|
||||
memcpy(dst, src, element_sizeof);
|
||||
element_offset += element_sizeof;
|
||||
}
|
||||
assert(element_offset == target_sizeof);
|
||||
results.row_data_.emplace_back(std::move(target));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
#include <memory>
|
||||
|
||||
#include "SegmentInterface.h"
|
||||
@ -26,6 +26,10 @@ class SegmentSealed : public SegmentInternalInterface {
|
||||
DropIndex(const FieldId field_id) = 0;
|
||||
virtual void
|
||||
DropFieldData(const FieldId field_id) = 0;
|
||||
virtual bool
|
||||
HasIndex(FieldId field_id) const = 0;
|
||||
virtual bool
|
||||
HasFieldData(FieldId field_id) const = 0;
|
||||
};
|
||||
|
||||
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;
|
||||
|
@ -11,7 +11,20 @@
|
||||
|
||||
#include "segcore/SegmentSealedImpl.h"
|
||||
#include "query/SearchOnSealed.h"
|
||||
#include "query/ScalarIndex.h"
|
||||
#include "query/SearchBruteForce.h"
|
||||
namespace milvus::segcore {
|
||||
|
||||
static inline void
|
||||
set_bit(boost::dynamic_bitset<>& bitset, FieldOffset field_offset, bool flag = true) {
|
||||
bitset[field_offset.get()] = flag;
|
||||
}
|
||||
|
||||
static inline bool
|
||||
get_bit(const boost::dynamic_bitset<>& bitset, FieldOffset field_offset) {
|
||||
return bitset[field_offset.get()];
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) {
|
||||
// NOTE: lock only when data is ready to avoid starvation
|
||||
@ -24,14 +37,17 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) {
|
||||
Assert(row_count > 0);
|
||||
|
||||
std::unique_lock lck(mutex_);
|
||||
Assert(!get_bit(vecindex_ready_bitset_, field_offset));
|
||||
if (row_count_opt_.has_value()) {
|
||||
AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns");
|
||||
} else {
|
||||
row_count_opt_ = row_count;
|
||||
}
|
||||
Assert(!vec_indexings_.is_ready(field_offset));
|
||||
vec_indexings_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index);
|
||||
set_field_ready(field_offset, true);
|
||||
Assert(!vecindexs_.is_ready(field_offset));
|
||||
vecindexs_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index);
|
||||
|
||||
set_bit(vecindex_ready_bitset_, field_offset, true);
|
||||
lck.unlock();
|
||||
}
|
||||
|
||||
void
|
||||
@ -61,26 +77,40 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
|
||||
// prepare data
|
||||
auto field_offset = schema_->get_offset(field_id);
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
Assert(!field_meta.is_vector());
|
||||
// Assert(!field_meta.is_vector());
|
||||
auto element_sizeof = field_meta.get_sizeof();
|
||||
auto span = SpanBase(info.blob, info.row_count, element_sizeof);
|
||||
auto length_in_bytes = element_sizeof * info.row_count;
|
||||
aligned_vector<char> vec_data(length_in_bytes);
|
||||
memcpy(vec_data.data(), info.blob, length_in_bytes);
|
||||
|
||||
// generate scalar index
|
||||
std::unique_ptr<knowhere::Index> index;
|
||||
if (!field_meta.is_vector()) {
|
||||
index = query::generate_scalar_index(span, field_meta.get_data_type());
|
||||
}
|
||||
|
||||
// write data under lock
|
||||
std::unique_lock lck(mutex_);
|
||||
update_row_count(info.row_count);
|
||||
AssertInfo(field_datas_[field_offset.get()].empty(), "already exists");
|
||||
field_datas_[field_offset.get()] = std::move(vec_data);
|
||||
AssertInfo(field_datas_[field_offset.get()].empty(), "field data already exists");
|
||||
|
||||
set_field_ready(field_offset, true);
|
||||
if (field_meta.is_vector()) {
|
||||
AssertInfo(!vecindexs_.is_ready(field_offset), "field data can't be loaded when indexing exists");
|
||||
field_datas_[field_offset.get()] = std::move(vec_data);
|
||||
} else {
|
||||
AssertInfo(!scalar_indexings_[field_offset.get()], "scalar indexing not cleared");
|
||||
field_datas_[field_offset.get()] = std::move(vec_data);
|
||||
scalar_indexings_[field_offset.get()] = std::move(index);
|
||||
}
|
||||
|
||||
set_bit(field_data_ready_bitset_, field_offset, true);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentSealedImpl::num_chunk_index(FieldOffset field_offset) const {
|
||||
// TODO: support scalar index
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
int64_t
|
||||
@ -96,7 +126,7 @@ SegmentSealedImpl::size_per_chunk() const {
|
||||
SpanBase
|
||||
SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
Assert(is_field_ready(field_offset));
|
||||
Assert(get_bit(field_data_ready_bitset_, field_offset));
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
auto element_sizeof = field_meta.get_sizeof();
|
||||
SpanBase base(field_datas_[field_offset.get()].data(), row_count_opt_.value(), element_sizeof);
|
||||
@ -106,7 +136,9 @@ SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) c
|
||||
const knowhere::Index*
|
||||
SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const {
|
||||
// TODO: support scalar index
|
||||
PanicInfo("unimplemented");
|
||||
auto ptr = scalar_indexings_[field_offset.get()].get();
|
||||
Assert(ptr);
|
||||
return ptr;
|
||||
}
|
||||
|
||||
int64_t
|
||||
@ -137,10 +169,44 @@ SegmentSealedImpl::vector_search(int64_t vec_count,
|
||||
QueryResult& output) const {
|
||||
auto field_offset = query_info.field_offset_;
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
|
||||
Assert(field_meta.is_vector());
|
||||
Assert(vec_indexings_.is_ready(field_offset));
|
||||
query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output);
|
||||
if (get_bit(vecindex_ready_bitset_, field_offset)) {
|
||||
Assert(vecindexs_.is_ready(field_offset));
|
||||
query::SearchOnSealed(*schema_, vecindexs_, query_info, query_data, query_count, bitset, output);
|
||||
} else if (get_bit(field_data_ready_bitset_, field_offset)) {
|
||||
query::dataset::QueryDataset dataset;
|
||||
dataset.query_data = query_data;
|
||||
dataset.num_queries = query_count;
|
||||
dataset.metric_type = field_meta.get_metric_type();
|
||||
dataset.topk = query_info.topK_;
|
||||
dataset.dim = field_meta.get_dim();
|
||||
|
||||
Assert(get_bit(field_data_ready_bitset_, field_offset));
|
||||
Assert(row_count_opt_.has_value());
|
||||
auto row_count = row_count_opt_.value();
|
||||
auto chunk_data = field_datas_[field_offset.get()].data();
|
||||
|
||||
auto sub_qr = [&] {
|
||||
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
return query::FloatSearchBruteForce(dataset, chunk_data, row_count, bitset);
|
||||
} else {
|
||||
return query::BinarySearchBruteForce(dataset, chunk_data, row_count, bitset);
|
||||
}
|
||||
}();
|
||||
|
||||
QueryResult results;
|
||||
results.result_distances_ = std::move(sub_qr.mutable_values());
|
||||
results.internal_seg_offsets_ = std::move(sub_qr.mutable_labels());
|
||||
results.topK_ = dataset.topk;
|
||||
results.num_queries_ = dataset.num_queries;
|
||||
|
||||
output = std::move(results);
|
||||
} else {
|
||||
PanicInfo("Field Data is not loaded");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::DropFieldData(const FieldId field_id) {
|
||||
if (SystemProperty::Instance().IsSystem(field_id)) {
|
||||
@ -156,10 +222,9 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) {
|
||||
} else {
|
||||
auto field_offset = schema_->get_offset(field_id);
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
Assert(!field_meta.is_vector());
|
||||
|
||||
std::unique_lock lck(mutex_);
|
||||
set_field_ready(field_offset, false);
|
||||
set_bit(field_data_ready_bitset_, field_offset, false);
|
||||
auto vec = std::move(field_datas_[field_offset.get()]);
|
||||
lck.unlock();
|
||||
|
||||
@ -175,7 +240,146 @@ SegmentSealedImpl::DropIndex(const FieldId field_id) {
|
||||
Assert(field_meta.is_vector());
|
||||
|
||||
std::unique_lock lck(mutex_);
|
||||
vec_indexings_.drop_field_indexing(field_offset);
|
||||
vecindexs_.drop_field_indexing(field_offset);
|
||||
set_bit(vecindex_ready_bitset_, field_offset, false);
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::check_search(const query::Plan* plan) const {
|
||||
Assert(plan);
|
||||
Assert(plan->extra_info_opt_.has_value());
|
||||
|
||||
if (!is_system_field_ready()) {
|
||||
PanicInfo("System Field RowID is not loaded");
|
||||
}
|
||||
|
||||
auto& request_fields = plan->extra_info_opt_.value().involved_fields_;
|
||||
auto field_ready_bitset = field_data_ready_bitset_ | vecindex_ready_bitset_;
|
||||
Assert(request_fields.size() == field_ready_bitset.size());
|
||||
auto absent_fields = request_fields - field_ready_bitset;
|
||||
|
||||
if (absent_fields.any()) {
|
||||
auto field_offset = FieldOffset(absent_fields.find_first());
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
PanicInfo("User Field(" + field_meta.get_name().get() + ") is not loaded");
|
||||
}
|
||||
}
|
||||
|
||||
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema)
|
||||
: schema_(schema),
|
||||
field_datas_(schema->size()),
|
||||
field_data_ready_bitset_(schema->size()),
|
||||
vecindex_ready_bitset_(schema->size()),
|
||||
scalar_indexings_(schema->size()) {
|
||||
}
|
||||
void
|
||||
SegmentSealedImpl::bulk_subscript(SystemFieldType system_type,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output) const {
|
||||
Assert(is_system_field_ready());
|
||||
Assert(system_type == SystemFieldType::RowId);
|
||||
bulk_subscript_impl<int64_t>(row_ids_.data(), seg_offsets, count, output);
|
||||
}
|
||||
template <typename T>
|
||||
void
|
||||
SegmentSealedImpl::bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) {
|
||||
static_assert(IsScalar<T>);
|
||||
auto src = reinterpret_cast<const T*>(src_raw);
|
||||
auto dst = reinterpret_cast<T*>(dst_raw);
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
dst[i] = offset == -1 ? -1 : src[offset];
|
||||
}
|
||||
}
|
||||
|
||||
// for vector
|
||||
void
|
||||
SegmentSealedImpl::bulk_subscript_impl(
|
||||
int64_t element_sizeof, const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) {
|
||||
auto src_vec = reinterpret_cast<const char*>(src_raw);
|
||||
auto dst_vec = reinterpret_cast<char*>(dst_raw);
|
||||
std::vector<char> none(element_sizeof, 0);
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
auto dst = dst_vec + i * element_sizeof;
|
||||
const char* src;
|
||||
if (offset != 0) {
|
||||
src = src_vec + element_sizeof * offset;
|
||||
} else {
|
||||
src = none.data();
|
||||
}
|
||||
memcpy(dst, src, element_sizeof);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::bulk_subscript(FieldOffset field_offset,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output) const {
|
||||
Assert(get_bit(field_data_ready_bitset_, field_offset));
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
auto src_vec = field_datas_[field_offset.get()].data();
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::BOOL: {
|
||||
bulk_subscript_impl<bool>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
case DataType::INT8: {
|
||||
bulk_subscript_impl<int8_t>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
case DataType::INT16: {
|
||||
bulk_subscript_impl<int16_t>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
case DataType::INT32: {
|
||||
bulk_subscript_impl<int32_t>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
case DataType::INT64: {
|
||||
bulk_subscript_impl<int64_t>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
case DataType::FLOAT: {
|
||||
bulk_subscript_impl<float>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
case DataType::DOUBLE: {
|
||||
bulk_subscript_impl<double>(src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
|
||||
case DataType::VECTOR_FLOAT:
|
||||
case DataType::VECTOR_BINARY: {
|
||||
bulk_subscript_impl(field_meta.get_sizeof(), src_vec, seg_offsets, count, output);
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
PanicInfo("unsupported");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
SegmentSealedImpl::HasIndex(FieldId field_id) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
Assert(!SystemProperty::Instance().IsSystem(field_id));
|
||||
auto field_offset = schema_->get_offset(field_id);
|
||||
return get_bit(vecindex_ready_bitset_, field_offset);
|
||||
}
|
||||
|
||||
bool
|
||||
SegmentSealedImpl::HasFieldData(FieldId field_id) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
if (SystemProperty::Instance().IsSystem(field_id)) {
|
||||
return is_system_field_ready();
|
||||
} else {
|
||||
auto field_offset = schema_->get_offset(field_id);
|
||||
return get_bit(field_data_ready_bitset_, field_offset);
|
||||
}
|
||||
}
|
||||
|
||||
SegmentSealedPtr
|
||||
|
@ -14,13 +14,12 @@
|
||||
#include "SealedIndexingRecord.h"
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
|
||||
namespace milvus::segcore {
|
||||
class SegmentSealedImpl : public SegmentSealed {
|
||||
public:
|
||||
explicit SegmentSealedImpl(SchemaPtr schema)
|
||||
: schema_(schema), field_datas_(schema->size()), field_ready_bitset_(schema->size()) {
|
||||
}
|
||||
explicit SegmentSealedImpl(SchemaPtr schema);
|
||||
void
|
||||
LoadIndex(const LoadIndexInfo& info) override;
|
||||
void
|
||||
@ -30,6 +29,11 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
void
|
||||
DropFieldData(const FieldId field_id) override;
|
||||
|
||||
bool
|
||||
HasIndex(FieldId field_id) const override;
|
||||
bool
|
||||
HasFieldData(FieldId field_id) const override;
|
||||
|
||||
public:
|
||||
int64_t
|
||||
GetMemoryUsageInBytes() const override;
|
||||
@ -62,56 +66,24 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
// Calculate: output[i] = Vec[seg_offset[i]],
|
||||
// where Vec is determined from field_offset
|
||||
void
|
||||
bulk_subscript(SystemFieldType system_type,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* output) const override {
|
||||
Assert(is_system_field_ready());
|
||||
Assert(system_type == SystemFieldType::RowId);
|
||||
bulk_subscript_impl<int64_t>(row_ids_.data(), seg_offsets, count, output);
|
||||
}
|
||||
bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const override;
|
||||
|
||||
// Calculate: output[i] = Vec[seg_offset[i]]
|
||||
// where Vec is determined from field_offset
|
||||
void
|
||||
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override {
|
||||
Assert(is_field_ready(field_offset));
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
Assert(field_meta.get_data_type() == DataType::INT64);
|
||||
bulk_subscript_impl<int64_t>(field_datas_[field_offset.get()].data(), seg_offsets, count, output);
|
||||
}
|
||||
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override;
|
||||
|
||||
void
|
||||
check_search(const query::Plan* plan) const override {
|
||||
Assert(plan);
|
||||
Assert(plan->extra_info_opt_.has_value());
|
||||
|
||||
if (!is_system_field_ready()) {
|
||||
PanicInfo("System Field RowID is not loaded");
|
||||
}
|
||||
|
||||
auto& request_fields = plan->extra_info_opt_.value().involved_fields_;
|
||||
Assert(request_fields.size() == field_ready_bitset_.size());
|
||||
auto absent_fields = request_fields - field_ready_bitset_;
|
||||
if (absent_fields.any()) {
|
||||
auto field_offset = FieldOffset(absent_fields.find_first());
|
||||
auto& field_meta = schema_->operator[](field_offset);
|
||||
PanicInfo("User Field(" + field_meta.get_name().get() + ") is not loaded");
|
||||
}
|
||||
}
|
||||
check_search(const query::Plan* plan) const override;
|
||||
|
||||
private:
|
||||
template <typename T>
|
||||
static void
|
||||
bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) {
|
||||
static_assert(IsScalar<T>);
|
||||
auto src = reinterpret_cast<const T*>(src_raw);
|
||||
auto dst = reinterpret_cast<T*>(dst_raw);
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
dst[i] = offset == -1 ? -1 : src[offset];
|
||||
}
|
||||
}
|
||||
bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw);
|
||||
|
||||
static void
|
||||
bulk_subscript_impl(
|
||||
int64_t element_sizeof, const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw);
|
||||
|
||||
void
|
||||
update_row_count(int64_t row_count) {
|
||||
@ -135,25 +107,16 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
return system_ready_count_ == 1;
|
||||
}
|
||||
|
||||
bool
|
||||
is_field_ready(FieldOffset field_offset) const {
|
||||
return field_ready_bitset_.test(field_offset.get());
|
||||
}
|
||||
|
||||
void
|
||||
set_field_ready(FieldOffset field_offset, bool flag = true) {
|
||||
field_ready_bitset_[field_offset.get()] = flag;
|
||||
}
|
||||
|
||||
private:
|
||||
// segment loading state
|
||||
boost::dynamic_bitset<> field_ready_bitset_;
|
||||
boost::dynamic_bitset<> field_data_ready_bitset_;
|
||||
boost::dynamic_bitset<> vecindex_ready_bitset_;
|
||||
std::atomic<int> system_ready_count_ = 0;
|
||||
// segment datas
|
||||
// TODO: generate index for scalar
|
||||
std::optional<int64_t> row_count_opt_;
|
||||
std::map<FieldOffset, knowhere::IndexPtr> scalar_indexings_;
|
||||
SealedIndexingRecord vec_indexings_;
|
||||
std::vector<std::unique_ptr<knowhere::Index>> scalar_indexings_;
|
||||
SealedIndexingRecord vecindexs_;
|
||||
std::vector<aligned_vector<char>> field_datas_;
|
||||
aligned_vector<idx_t> row_ids_;
|
||||
SchemaPtr schema_;
|
||||
|
@ -247,7 +247,7 @@ TEST(Indexing, BinaryBruteForce) {
|
||||
auto dataset = DataGen(schema, N, 10);
|
||||
auto bin_vec = dataset.get_col<uint8_t>(0);
|
||||
auto query_data = 1024 * dim / 8 + bin_vec.data();
|
||||
query::dataset::BinaryQueryDataset query_dataset{
|
||||
query::dataset::QueryDataset query_dataset{
|
||||
faiss::MetricType::METRIC_Jaccard, //
|
||||
num_queries, //
|
||||
topk, //
|
||||
|
@ -10,6 +10,7 @@
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include "query/ScalarIndex.h"
|
||||
|
||||
TEST(TestNaive, Naive) {
|
||||
EXPECT_TRUE(true);
|
||||
|
@ -129,6 +129,100 @@ TEST(Query, ParsePlaceholderGroup) {
|
||||
auto placeholder = ParsePlaceholderGroup(plan.get(), blob);
|
||||
}
|
||||
|
||||
TEST(Query, ExecWithPredicateLoader) {
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
auto schema = std::make_shared<Schema>();
|
||||
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2);
|
||||
schema->AddDebugField("age", DataType::FLOAT);
|
||||
std::string dsl = R"({
|
||||
"bool": {
|
||||
"must": [
|
||||
{
|
||||
"range": {
|
||||
"age": {
|
||||
"GE": -1,
|
||||
"LT": 1
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"vector": {
|
||||
"fakevec": {
|
||||
"metric_type": "L2",
|
||||
"params": {
|
||||
"nprobe": 10
|
||||
},
|
||||
"query": "$0",
|
||||
"topk": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
})";
|
||||
int64_t N = 1000 * 1000;
|
||||
auto dataset = DataGen(schema, N);
|
||||
auto segment = CreateGrowingSegment(schema);
|
||||
segment->PreInsert(N);
|
||||
ColumnBasedRawData raw_data;
|
||||
raw_data.columns_ = dataset.cols_;
|
||||
raw_data.count = N;
|
||||
segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), raw_data);
|
||||
|
||||
auto plan = CreatePlan(*schema, dsl);
|
||||
auto num_queries = 5;
|
||||
auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024);
|
||||
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
|
||||
Timestamp time = 1000000;
|
||||
std::vector<const PlaceholderGroup*> ph_group_arr = {ph_group.get()};
|
||||
auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1);
|
||||
int topk = 5;
|
||||
|
||||
Json json = QueryResultToJson(qr);
|
||||
auto ref = json::parse(R"(
|
||||
[
|
||||
[
|
||||
[
|
||||
"980486->3.149221",
|
||||
"318367->3.661235",
|
||||
"302798->4.553688",
|
||||
"321424->4.757450",
|
||||
"565529->5.083780"
|
||||
],
|
||||
[
|
||||
"233390->7.931535",
|
||||
"238958->8.109344",
|
||||
"230645->8.439169",
|
||||
"901939->8.658772",
|
||||
"380328->8.731251"
|
||||
],
|
||||
[
|
||||
"897246->3.749835",
|
||||
"750683->3.897577",
|
||||
"857598->4.230977",
|
||||
"299009->4.379639",
|
||||
"440010->4.454046"
|
||||
],
|
||||
[
|
||||
"840855->4.782170",
|
||||
"709627->5.063170",
|
||||
"72322->5.166143",
|
||||
"107142->5.180207",
|
||||
"948403->5.247065"
|
||||
],
|
||||
[
|
||||
"810401->3.926393",
|
||||
"46575->4.054171",
|
||||
"201740->4.274491",
|
||||
"669040->4.399628",
|
||||
"231500->4.831223"
|
||||
]
|
||||
]
|
||||
])");
|
||||
ASSERT_EQ(json.dump(2), ref.dump(2));
|
||||
}
|
||||
|
||||
TEST(Query, ExecWithPredicate) {
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
@ -449,6 +543,7 @@ TEST(Query, FillSegment) {
|
||||
proto.set_name("col");
|
||||
proto.set_description("asdfhsalkgfhsadg");
|
||||
proto.set_autoid(false);
|
||||
auto dim = 16;
|
||||
|
||||
{
|
||||
auto field = proto.add_fields();
|
||||
@ -474,12 +569,49 @@ TEST(Query, FillSegment) {
|
||||
field->set_data_type(pb::schema::DataType::INT64);
|
||||
}
|
||||
|
||||
{
|
||||
auto field = proto.add_fields();
|
||||
field->set_name("the_value");
|
||||
field->set_fieldid(102);
|
||||
field->set_is_primary_key(false);
|
||||
field->set_description("asdgfsagf");
|
||||
field->set_data_type(pb::schema::DataType::INT32);
|
||||
}
|
||||
|
||||
auto schema = Schema::ParseFrom(proto);
|
||||
auto segment = CreateGrowingSegment(schema);
|
||||
|
||||
// dispatch here
|
||||
int N = 100000;
|
||||
auto dataset = DataGen(schema, N);
|
||||
segment->PreInsert(N);
|
||||
segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_);
|
||||
const auto std_vec = dataset.get_col<int64_t>(1);
|
||||
const auto std_vfloat_vec = dataset.get_col<float>(0);
|
||||
const auto std_i32_vec = dataset.get_col<int32_t>(2);
|
||||
|
||||
std::vector<std::unique_ptr<SegmentInternalInterface>> segments;
|
||||
segments.emplace_back([&] {
|
||||
auto segment = CreateGrowingSegment(schema);
|
||||
segment->PreInsert(N);
|
||||
segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_);
|
||||
return segment;
|
||||
}());
|
||||
segments.emplace_back([&] {
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
SealedLoader(dataset, *segment);
|
||||
// auto indexing = GenIndexing(N, dim, std_vfloat_vec.data());
|
||||
|
||||
// LoadIndexInfo info;
|
||||
// auto field_offset = schema->get_offset(FieldName("fakevec"));
|
||||
// auto& meta = schema->operator[](field_offset);
|
||||
|
||||
// info.field_id = meta.get_id().get();
|
||||
// info.field_name = meta.get_name().get();
|
||||
// info.index_params["metric_type"] = "L2";
|
||||
// info.index = indexing;
|
||||
|
||||
// segment->LoadIndex(info);
|
||||
return segment;
|
||||
}());
|
||||
|
||||
std::string dsl = R"({
|
||||
"bool": {
|
||||
"must": [
|
||||
@ -503,27 +635,44 @@ TEST(Query, FillSegment) {
|
||||
auto ph = ParsePlaceholderGroup(plan.get(), ph_proto.SerializeAsString());
|
||||
std::vector<const PlaceholderGroup*> groups = {ph.get()};
|
||||
std::vector<Timestamp> timestamps = {N * 2UL};
|
||||
QueryResult result;
|
||||
result = segment->Search(plan.get(), groups.data(), timestamps.data(), 1);
|
||||
|
||||
auto topk = 5;
|
||||
auto num_queries = 10;
|
||||
|
||||
result.result_offsets_.resize(topk * num_queries);
|
||||
segment->FillTargetEntry(plan.get(), result);
|
||||
for (auto& segment : segments) {
|
||||
plan->target_entries_.clear();
|
||||
plan->target_entries_.push_back(schema->get_offset(FieldName("fakevec")));
|
||||
plan->target_entries_.push_back(schema->get_offset(FieldName("the_value")));
|
||||
QueryResult result = segment->Search(plan.get(), groups.data(), timestamps.data(), 1);
|
||||
// std::cout << QueryResultToJson(result).dump(2);
|
||||
result.result_offsets_.resize(topk * num_queries);
|
||||
segment->FillTargetEntry(plan.get(), result);
|
||||
|
||||
auto ans = result.row_data_;
|
||||
ASSERT_EQ(ans.size(), topk * num_queries);
|
||||
int64_t std_index = 0;
|
||||
auto std_vec = dataset.get_col<int64_t>(1);
|
||||
for (auto& vec : ans) {
|
||||
ASSERT_EQ(vec.size(), sizeof(int64_t));
|
||||
int64_t val;
|
||||
memcpy(&val, vec.data(), sizeof(int64_t));
|
||||
auto internal_offset = result.internal_seg_offsets_[std_index];
|
||||
auto std_val = std_vec[internal_offset];
|
||||
ASSERT_EQ(val, std_val) << "io:" << internal_offset;
|
||||
++std_index;
|
||||
auto ans = result.row_data_;
|
||||
ASSERT_EQ(ans.size(), topk * num_queries);
|
||||
int64_t std_index = 0;
|
||||
|
||||
for (auto& vec : ans) {
|
||||
ASSERT_EQ(vec.size(), sizeof(int64_t) + sizeof(float) * dim + sizeof(int32_t));
|
||||
int64_t val;
|
||||
memcpy(&val, vec.data(), sizeof(int64_t));
|
||||
|
||||
auto internal_offset = result.internal_seg_offsets_[std_index];
|
||||
auto std_val = std_vec[internal_offset];
|
||||
auto std_i32 = std_i32_vec[internal_offset];
|
||||
std::vector<float> std_vfloat(dim);
|
||||
std::copy_n(std_vfloat_vec.begin() + dim * internal_offset, dim, std_vfloat.begin());
|
||||
|
||||
ASSERT_EQ(val, std_val) << "io:" << internal_offset;
|
||||
if (val != -1) {
|
||||
std::vector<float> vfloat(dim);
|
||||
int i32;
|
||||
memcpy(vfloat.data(), vec.data() + sizeof(int64_t), dim * sizeof(float));
|
||||
memcpy(&i32, vec.data() + sizeof(int64_t) + dim * sizeof(float), sizeof(int32_t));
|
||||
ASSERT_EQ(vfloat, std_vfloat) << std_index;
|
||||
ASSERT_EQ(i32, std_i32) << std_index;
|
||||
}
|
||||
++std_index;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -214,32 +214,6 @@ TEST(Sealed, with_predicate) {
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) {
|
||||
// TODO
|
||||
auto row_count = dataset.row_ids_.size();
|
||||
{
|
||||
LoadFieldDataInfo info;
|
||||
info.blob = dataset.row_ids_.data();
|
||||
info.row_count = dataset.row_ids_.size();
|
||||
info.field_id = 0; // field id for RowId
|
||||
seg.LoadFieldData(info);
|
||||
}
|
||||
int field_offset = 0;
|
||||
for (auto& meta : seg.get_schema().get_fields()) {
|
||||
if (meta.is_vector()) {
|
||||
++field_offset;
|
||||
continue;
|
||||
}
|
||||
LoadFieldDataInfo info;
|
||||
info.field_id = meta.get_id().get();
|
||||
info.row_count = row_count;
|
||||
info.blob = dataset.cols_[field_offset].data();
|
||||
seg.LoadFieldData(info);
|
||||
++field_offset;
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Sealed, LoadFieldData) {
|
||||
auto dim = 16;
|
||||
auto topK = 5;
|
||||
@ -255,16 +229,7 @@ TEST(Sealed, LoadFieldData) {
|
||||
|
||||
auto fakevec = dataset.get_col<float>(0);
|
||||
|
||||
auto conf = knowhere::Config{{knowhere::meta::DIM, dim},
|
||||
{knowhere::meta::TOPK, topK},
|
||||
{knowhere::IndexParams::nlist, 100},
|
||||
{knowhere::IndexParams::nprobe, 10},
|
||||
{knowhere::Metric::TYPE, milvus::knowhere::Metric::L2},
|
||||
{knowhere::meta::DEVICEID, 0}};
|
||||
auto database = knowhere::GenDataset(N, dim, fakevec.data());
|
||||
auto indexing = std::make_shared<knowhere::IVF>();
|
||||
indexing->Train(database, conf);
|
||||
indexing->AddWithoutIds(database, conf);
|
||||
auto indexing = GenIndexing(N, dim, fakevec.data());
|
||||
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
std::string dsl = R"({
|
||||
@ -305,7 +270,9 @@ TEST(Sealed, LoadFieldData) {
|
||||
|
||||
SealedLoader(dataset, *segment);
|
||||
segment->DropFieldData(nothing_id);
|
||||
segment->Search(plan.get(), ph_group_arr.data(), &time, 1);
|
||||
|
||||
segment->DropFieldData(fakevec_id);
|
||||
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1));
|
||||
|
||||
LoadIndexInfo vec_info;
|
||||
@ -336,4 +303,46 @@ TEST(Sealed, LoadFieldData) {
|
||||
ASSERT_EQ(json.dump(-2), json2.dump(-2));
|
||||
segment->DropFieldData(double_id);
|
||||
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1));
|
||||
auto std_json = Json::parse(R"(
|
||||
[
|
||||
[
|
||||
[
|
||||
"980486->3.149221",
|
||||
"579754->3.634295",
|
||||
"318367->3.661235",
|
||||
"265835->4.333358",
|
||||
"302798->4.553688"
|
||||
],
|
||||
[
|
||||
"233390->7.931535",
|
||||
"238958->8.109344",
|
||||
"230645->8.439169",
|
||||
"901939->8.658772",
|
||||
"380328->8.731251"
|
||||
],
|
||||
[
|
||||
"897246->3.749835",
|
||||
"750683->3.897577",
|
||||
"857598->4.230977",
|
||||
"299009->4.379639",
|
||||
"440010->4.454046"
|
||||
],
|
||||
[
|
||||
"37641->3.783446",
|
||||
"22628->4.719435",
|
||||
"840855->4.782170",
|
||||
"709627->5.063170",
|
||||
"635836->5.156095"
|
||||
],
|
||||
[
|
||||
"810401->3.926393",
|
||||
"46575->4.054171",
|
||||
"201740->4.274491",
|
||||
"669040->4.399628",
|
||||
"231500->4.831223"
|
||||
]
|
||||
]
|
||||
]
|
||||
)");
|
||||
ASSERT_EQ(std_json.dump(-2), json.dump(-2));
|
||||
}
|
||||
|
@ -15,15 +15,22 @@
|
||||
#include <memory>
|
||||
#include <cstring>
|
||||
#include "segcore/SegmentGrowing.h"
|
||||
#include "segcore/SegmentSealed.h"
|
||||
#include "Constants.h"
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
#include <knowhere/index/vector_index/VecIndex.h>
|
||||
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
||||
#include <knowhere/index/vector_index/VecIndexFactory.h>
|
||||
#include <knowhere/index/vector_index/IndexIVF.h>
|
||||
#include <query/SearchOnIndex.h>
|
||||
using boost::algorithm::starts_with;
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
struct GeneratedData {
|
||||
std::vector<char> rows_;
|
||||
std::vector<std::vector<char>> cols_;
|
||||
std::vector<aligned_vector<uint8_t>> cols_;
|
||||
std::vector<idx_t> row_ids_;
|
||||
std::vector<Timestamp> timestamps_;
|
||||
RowBasedRawData raw_;
|
||||
@ -51,6 +58,7 @@ struct GeneratedData {
|
||||
void
|
||||
generate_rows(int N, SchemaPtr schema);
|
||||
};
|
||||
|
||||
inline void
|
||||
GeneratedData::generate_rows(int N, SchemaPtr schema) {
|
||||
std::vector<int> offset_infos(schema->size() + 1, 0);
|
||||
@ -78,7 +86,7 @@ GeneratedData::generate_rows(int N, SchemaPtr schema) {
|
||||
inline GeneratedData
|
||||
DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) {
|
||||
using std::vector;
|
||||
std::vector<vector<char>> cols;
|
||||
std::vector<aligned_vector<uint8_t>> cols;
|
||||
std::default_random_engine er(seed);
|
||||
std::normal_distribution<> distr(0, 1);
|
||||
int offset = 0;
|
||||
@ -86,7 +94,7 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) {
|
||||
auto insert_cols = [&cols](auto& data) {
|
||||
using T = std::remove_reference_t<decltype(data)>;
|
||||
auto len = sizeof(typename T::value_type) * data.size();
|
||||
auto ptr = vector<char>(len);
|
||||
auto ptr = aligned_vector<uint8_t>(len);
|
||||
memcpy(ptr.data(), data.data(), len);
|
||||
cols.emplace_back(std::move(ptr));
|
||||
};
|
||||
@ -280,4 +288,40 @@ QueryResultToJson(const QueryResult& qr) {
|
||||
return json{results};
|
||||
};
|
||||
|
||||
inline void
|
||||
SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) {
|
||||
// TODO
|
||||
auto row_count = dataset.row_ids_.size();
|
||||
{
|
||||
LoadFieldDataInfo info;
|
||||
info.blob = dataset.row_ids_.data();
|
||||
info.row_count = dataset.row_ids_.size();
|
||||
info.field_id = 0; // field id for RowId
|
||||
seg.LoadFieldData(info);
|
||||
}
|
||||
int field_offset = 0;
|
||||
for (auto& meta : seg.get_schema().get_fields()) {
|
||||
LoadFieldDataInfo info;
|
||||
info.field_id = meta.get_id().get();
|
||||
info.row_count = row_count;
|
||||
info.blob = dataset.cols_[field_offset].data();
|
||||
seg.LoadFieldData(info);
|
||||
++field_offset;
|
||||
}
|
||||
}
|
||||
|
||||
inline knowhere::VecIndexPtr
|
||||
GenIndexing(int64_t N, int64_t dim, const float* vec) {
|
||||
auto conf = knowhere::Config{{knowhere::meta::DIM, dim},
|
||||
{knowhere::IndexParams::nlist, 100},
|
||||
{knowhere::IndexParams::nprobe, 10},
|
||||
{knowhere::Metric::TYPE, milvus::knowhere::Metric::L2},
|
||||
{knowhere::meta::DEVICEID, 0}};
|
||||
auto database = knowhere::GenDataset(N, dim, vec);
|
||||
auto indexing = std::make_shared<knowhere::IVF>();
|
||||
indexing->Train(database, conf);
|
||||
indexing->AddWithoutIds(database, conf);
|
||||
return indexing;
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
@ -490,9 +490,9 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
|
||||
wg.Add(1)
|
||||
go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex)
|
||||
}
|
||||
ms.consumerLock.Unlock()
|
||||
wg.Wait()
|
||||
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
|
||||
ms.consumerLock.Unlock()
|
||||
if !ok || timeStamp <= ms.lastTimeStamp {
|
||||
//log.Printf("All timeTick's timestamps are inconsistent")
|
||||
continue
|
||||
@ -501,6 +501,9 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
|
||||
msgPositions := make([]*internalpb2.MsgPosition, 0)
|
||||
ms.unsolvedMutex.Lock()
|
||||
for consumer, msgs := range ms.unsolvedBuf {
|
||||
if len(msgs) == 0 {
|
||||
continue
|
||||
}
|
||||
tempBuffer := make([]TsMsg, 0)
|
||||
var timeTickMsg TsMsg
|
||||
for _, v := range msgs {
|
||||
|
@ -590,11 +590,33 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er
|
||||
}
|
||||
|
||||
dmChannels := resp.Values
|
||||
channels2NodeID := qs.shuffleChannelsToQueryNode(dmChannels)
|
||||
watchedChannels2NodeID := make(map[string]UniqueID)
|
||||
unWatchedChannels := make([]string, 0)
|
||||
for _, channel := range dmChannels {
|
||||
findChannel := false
|
||||
for nodeID, node := range qs.queryNodes {
|
||||
watchedChannels := node.dmChannelNames
|
||||
for _, watchedChannel := range watchedChannels {
|
||||
if channel == watchedChannel {
|
||||
findChannel = true
|
||||
watchedChannels2NodeID[channel] = nodeID
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !findChannel {
|
||||
unWatchedChannels = append(unWatchedChannels, channel)
|
||||
}
|
||||
}
|
||||
channels2NodeID := qs.shuffleChannelsToQueryNode(unWatchedChannels)
|
||||
err = qs.replica.addDmChannels(dbID, collection.id, channels2NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = qs.replica.addDmChannels(dbID, collection.id, watchedChannels2NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
node2channels := make(map[UniqueID][]string)
|
||||
for channel, nodeID := range channels2NodeID {
|
||||
if _, ok := node2channels[nodeID]; ok {
|
||||
@ -625,6 +647,9 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er
|
||||
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[string]UniqueID {
|
||||
maxNumDMChannel := 0
|
||||
res := make(map[string]UniqueID)
|
||||
if len(dmChannels) == 0 {
|
||||
return res
|
||||
}
|
||||
node2lens := make(map[UniqueID]int)
|
||||
for id, node := range qs.queryNodes {
|
||||
node2lens[id] = len(node.dmChannelNames)
|
||||
|
Loading…
Reference in New Issue
Block a user