mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Fix double copy varchar field while loading (#22114)
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
fef12b53ef
commit
187788059b
@ -12,6 +12,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <google/protobuf/text_format.h>
|
#include <google/protobuf/text_format.h>
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include "common/Consts.h"
|
#include "common/Consts.h"
|
||||||
@ -85,14 +86,14 @@ PostfixMatch(const std::string& str, const std::string& postfix) {
|
|||||||
inline int64_t
|
inline int64_t
|
||||||
upper_align(int64_t value, int64_t align) {
|
upper_align(int64_t value, int64_t align) {
|
||||||
Assert(align > 0);
|
Assert(align > 0);
|
||||||
auto groups = (value + align - 1) / align;
|
auto groups = value / align + (value % align != 0);
|
||||||
return groups * align;
|
return groups * align;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int64_t
|
inline int64_t
|
||||||
upper_div(int64_t value, int64_t align) {
|
upper_div(int64_t value, int64_t align) {
|
||||||
Assert(align > 0);
|
Assert(align > 0);
|
||||||
auto groups = (value + align - 1) / align;
|
auto groups = value / align + (value % align != 0);
|
||||||
return groups;
|
return groups;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,10 +109,16 @@ VectorBase::fill_chunk_data(ssize_t element_count, const DataArray* data, const
|
|||||||
return fill_chunk_data(data->scalars().double_data().data().data(), element_count);
|
return fill_chunk_data(data->scalars().double_data().data().data(), element_count);
|
||||||
}
|
}
|
||||||
case DataType::VARCHAR: {
|
case DataType::VARCHAR: {
|
||||||
auto begin = data->scalars().string_data().data().begin();
|
auto vec = static_cast<ConcurrentVector<std::string>*>(this);
|
||||||
auto end = data->scalars().string_data().data().end();
|
auto count = data->scalars().string_data().data().size();
|
||||||
std::vector<std::string> data_raw(begin, end);
|
vec->grow_on_demand(count);
|
||||||
return fill_chunk_data(data_raw.data(), element_count);
|
auto& chunk = vec->get_chunk(0);
|
||||||
|
|
||||||
|
size_t index = 0;
|
||||||
|
for (auto& str : data->scalars().string_data().data()) {
|
||||||
|
chunk[index++] = str;
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
PanicInfo("unsupported");
|
PanicInfo("unsupported");
|
||||||
|
@ -11,18 +11,18 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <tbb/concurrent_vector.h>
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <shared_mutex>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <shared_mutex>
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include <tbb/concurrent_vector.h>
|
|
||||||
|
|
||||||
#include "common/FieldMeta.h"
|
#include "common/FieldMeta.h"
|
||||||
#include "common/Span.h"
|
#include "common/Span.h"
|
||||||
#include "common/Types.h"
|
#include "common/Types.h"
|
||||||
@ -148,6 +148,12 @@ class ConcurrentVectorImpl : public VectorBase {
|
|||||||
chunks_.emplace_to_at_least(chunk_count, Dim * size_per_chunk_);
|
chunks_.emplace_to_at_least(chunk_count, Dim * size_per_chunk_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
grow_on_demand(int64_t element_count) {
|
||||||
|
auto chunk_count = upper_div(element_count, size_per_chunk_);
|
||||||
|
chunks_.emplace_to_at_least(chunk_count, Dim * element_count);
|
||||||
|
}
|
||||||
|
|
||||||
Span<TraitType>
|
Span<TraitType>
|
||||||
get_span(int64_t chunk_id) const {
|
get_span(int64_t chunk_id) const {
|
||||||
auto& chunk = get_chunk(chunk_id);
|
auto& chunk = get_chunk(chunk_id);
|
||||||
@ -224,6 +230,11 @@ class ConcurrentVectorImpl : public VectorBase {
|
|||||||
return chunks_[chunk_index];
|
return chunks_[chunk_index];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Chunk&
|
||||||
|
get_chunk(ssize_t index) {
|
||||||
|
return chunks_[index];
|
||||||
|
}
|
||||||
|
|
||||||
const void*
|
const void*
|
||||||
get_chunk_data(ssize_t chunk_index) const override {
|
get_chunk_data(ssize_t chunk_index) const override {
|
||||||
return chunks_[chunk_index].data();
|
return chunks_[chunk_index].data();
|
||||||
@ -274,7 +285,7 @@ class ConcurrentVectorImpl : public VectorBase {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto chunk_max_size = chunks_.size();
|
auto chunk_max_size = chunks_.size();
|
||||||
Assert(chunk_id < chunk_max_size);
|
AssertInfo(chunk_id < chunk_max_size, "chunk_id=" + std::to_string(chunk_id));
|
||||||
Chunk& chunk = chunks_[chunk_id];
|
Chunk& chunk = chunks_[chunk_id];
|
||||||
auto ptr = chunk.data();
|
auto ptr = chunk.data();
|
||||||
std::copy_n(source + source_offset * Dim, element_count * Dim, ptr + chunk_offset * Dim);
|
std::copy_n(source + source_offset * Dim, element_count * Dim, ptr + chunk_offset * Dim);
|
||||||
|
Loading…
Reference in New Issue
Block a user