enhance: allow many segments for inverted index (#35616)

fix: https://github.com/milvus-io/milvus/issues/35615

---------

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2024-08-28 11:30:59 +08:00 committed by GitHub
parent d8aa01bc1a
commit a52ba3d09d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 577 additions and 307 deletions

View File

@ -420,10 +420,10 @@ InvertedIndexTantivy<T>::BuildWithRawData(size_t n,
// only used in ut.
auto arr = static_cast<const boost::container::vector<T>*>(values);
for (size_t i = 0; i < n; i++) {
wrapper_->template add_multi_data(arr[i].data(), arr[i].size());
wrapper_->template add_multi_data(arr[i].data(), arr[i].size(), i);
}
} else {
wrapper_->add_data<T>(static_cast<const T*>(values), n);
wrapper_->add_data<T>(static_cast<const T*>(values), n, 0);
}
finish();
}
@ -449,20 +449,27 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
case proto::schema::DataType::Double:
case proto::schema::DataType::String:
case proto::schema::DataType::VarChar: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
if (schema_.nullable()) {
int64_t offset = 0;
if (schema_.nullable()) {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
for (int i = 0; i < n; i++) {
if (!data->is_valid(i)) {
null_offset.push_back(i);
}
wrapper_->add_multi_data<T>(
static_cast<const T*>(data->RawValue(i)),
data->is_valid(i));
data->is_valid(i),
offset++);
}
continue;
}
wrapper_->add_data<T>(static_cast<const T*>(data->Data()), n);
} else {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<T>(
static_cast<const T*>(data->Data()), n, offset);
offset += n;
}
}
break;
}
@ -483,6 +490,7 @@ template <typename T>
void
InvertedIndexTantivy<T>::build_index_for_array(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
int64_t offset = 0;
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
auto array_column = static_cast<const Array*>(data->Data());
@ -494,7 +502,9 @@ InvertedIndexTantivy<T>::build_index_for_array(
}
auto length = data->is_valid(i) ? array_column[i].length() : 0;
wrapper_->template add_multi_data(
reinterpret_cast<const T*>(array_column[i].data()), length);
reinterpret_cast<const T*>(array_column[i].data()),
length,
offset++);
}
}
}
@ -503,6 +513,7 @@ template <>
void
InvertedIndexTantivy<std::string>::build_index_for_array(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
int64_t offset = 0;
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
auto array_column = static_cast<const Array*>(data->Data());
@ -519,7 +530,7 @@ InvertedIndexTantivy<std::string>::build_index_for_array(
array_column[i].template get_data<std::string>(j));
}
auto length = data->is_valid(i) ? output.size() : 0;
wrapper_->template add_multi_data(output.data(), length);
wrapper_->template add_multi_data(output.data(), length, offset++);
}
}
}

View File

@ -34,7 +34,7 @@ add_custom_target(tantivy_binding_target DEPENDS compile_tantivy ls_cargo_target
set(INSTALL_COMMAND
cp ${LIB_HEADER_FOLDER}/tantivy-binding.h ${TANTIVY_INCLUDE_DIR}/ &&
cp ${CMAKE_CURRENT_SOURCE_DIR}/tantivy-wrapper.h ${TANTIVY_INCLUDE_DIR}/ &&
cp ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${TANTIVY_INCLUDE_DIR}/ &&
cp ${LIB_FILE} ${TANTIVY_LIB_DIR}/)
add_custom_command(OUTPUT install_tantivy
COMMENT "Install tantivy target ${LIB_FILE} to ${TANTIVY_LIB_DIR}"

View File

@ -29,7 +29,7 @@ build_index(size_t n = 1000000) {
arr.push_back(std::to_string(x));
}
w.add_data<std::string>(arr.data(), arr.size());
w.add_data<std::string>(arr.data(), arr.size(), 0);
w.finish();
assert(w.count() == n);

View File

@ -0,0 +1,65 @@
#pragma once
#include <iostream>
#include <sstream>
#include "tantivy-binding.h"
#include "rust-binding.h"
namespace milvus::tantivy {
struct RustArrayWrapper {
NO_COPY_OR_ASSIGN(RustArrayWrapper);
explicit RustArrayWrapper(RustArray array) : array_(array) {
}
RustArrayWrapper(RustArrayWrapper&& other) noexcept {
array_.array = other.array_.array;
array_.len = other.array_.len;
array_.cap = other.array_.cap;
other.array_.array = nullptr;
other.array_.len = 0;
other.array_.cap = 0;
}
RustArrayWrapper&
operator=(RustArrayWrapper&& other) noexcept {
if (this != &other) {
free();
array_.array = other.array_.array;
array_.len = other.array_.len;
array_.cap = other.array_.cap;
other.array_.array = nullptr;
other.array_.len = 0;
other.array_.cap = 0;
}
return *this;
}
~RustArrayWrapper() {
free();
}
void
debug() {
std::stringstream ss;
ss << "[ ";
for (int i = 0; i < array_.len; i++) {
ss << array_.array[i] << " ";
}
ss << "]";
std::cout << ss.str() << std::endl;
}
RustArray array_;
private:
void
free() {
if (array_.array != nullptr) {
free_rust_array(array_);
}
}
};
} // namespace milvus::tantivy

View File

@ -0,0 +1,7 @@
#pragma once
namespace milvus::tantivy {
#define NO_COPY_OR_ASSIGN(ClassName) \
ClassName(const ClassName&) = delete; \
ClassName& operator=(const ClassName&) = delete;
} // namespace milvus::tantivy

View File

@ -23,6 +23,8 @@ extern "C" {
void free_rust_array(RustArray array);
void print_vector_of_strings(const char *const *ptr, uintptr_t len);
void *tantivy_load_index(const char *path);
void tantivy_free_index_reader(void *ptr);
@ -75,46 +77,51 @@ RustArray tantivy_prefix_query_keyword(void *ptr, const char *prefix);
RustArray tantivy_regex_query(void *ptr, const char *pattern);
void *tantivy_create_index(const char *field_name, TantivyDataType data_type, const char *path);
void *tantivy_create_index(const char *field_name,
TantivyDataType data_type,
const char *path,
uintptr_t num_threads,
uintptr_t overall_memory_budget_in_bytes);
void tantivy_free_index_writer(void *ptr);
void tantivy_finish_index(void *ptr);
void tantivy_index_add_int8s(void *ptr, const int8_t *array, uintptr_t len);
void tantivy_index_add_int8s(void *ptr, const int8_t *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_int16s(void *ptr, const int16_t *array, uintptr_t len);
void tantivy_index_add_int16s(void *ptr, const int16_t *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_int32s(void *ptr, const int32_t *array, uintptr_t len);
void tantivy_index_add_int32s(void *ptr, const int32_t *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_int64s(void *ptr, const int64_t *array, uintptr_t len);
void tantivy_index_add_int64s(void *ptr, const int64_t *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_f32s(void *ptr, const float *array, uintptr_t len);
void tantivy_index_add_f32s(void *ptr, const float *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_f64s(void *ptr, const double *array, uintptr_t len);
void tantivy_index_add_f64s(void *ptr, const double *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_bools(void *ptr, const bool *array, uintptr_t len);
void tantivy_index_add_bools(void *ptr, const bool *array, uintptr_t len, int64_t offset_begin);
void tantivy_index_add_keyword(void *ptr, const char *s);
void tantivy_index_add_string(void *ptr, const char *s, int64_t offset);
void tantivy_index_add_multi_int8s(void *ptr, const int8_t *array, uintptr_t len);
void tantivy_index_add_multi_int8s(void *ptr, const int8_t *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_int16s(void *ptr, const int16_t *array, uintptr_t len);
void tantivy_index_add_multi_int16s(void *ptr, const int16_t *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_int32s(void *ptr, const int32_t *array, uintptr_t len);
void tantivy_index_add_multi_int32s(void *ptr, const int32_t *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_int64s(void *ptr, const int64_t *array, uintptr_t len);
void tantivy_index_add_multi_int64s(void *ptr, const int64_t *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_f32s(void *ptr, const float *array, uintptr_t len);
void tantivy_index_add_multi_f32s(void *ptr, const float *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_f64s(void *ptr, const double *array, uintptr_t len);
void tantivy_index_add_multi_f64s(void *ptr, const double *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_bools(void *ptr, const bool *array, uintptr_t len);
void tantivy_index_add_multi_bools(void *ptr, const bool *array, uintptr_t len, int64_t offset);
void tantivy_index_add_multi_keywords(void *ptr, const char *const *array, uintptr_t len);
void tantivy_index_add_multi_keywords(void *ptr,
const char *const *array,
uintptr_t len,
int64_t offset);
bool tantivy_index_exist(const char *path);
void print_vector_of_strings(const char *const *ptr, uintptr_t len);
} // extern "C"

View File

@ -1,14 +1,13 @@
use std::{ffi::{c_char, CStr}, slice};
use std::{
ffi::{c_char, CStr},
slice,
};
#[no_mangle]
pub extern "C" fn print_vector_of_strings(ptr: *const *const c_char, len: usize) {
let arr : &[*const c_char] = unsafe {
slice::from_raw_parts(ptr, len)
};
for element in arr {
let c_str = unsafe {
CStr::from_ptr(*element)
};
println!("{}", c_str.to_str().unwrap());
}
}
let arr: &[*const c_char] = unsafe { slice::from_raw_parts(ptr, len) };
for element in arr {
let c_str = unsafe { CStr::from_ptr(*element) };
println!("{}", c_str.to_str().unwrap());
}
}

View File

@ -0,0 +1,60 @@
use tantivy::{
collector::{Collector, SegmentCollector},
fastfield::Column,
DocId, Score, SegmentOrdinal, SegmentReader,
};
pub(crate) struct DocIdCollector;
impl Collector for DocIdCollector {
type Fruit = Vec<u32>;
type Child = DocIdChildCollector;
fn for_segment(
&self,
_segment_local_id: SegmentOrdinal,
segment: &SegmentReader,
) -> tantivy::Result<Self::Child> {
Ok(DocIdChildCollector {
docs: Vec::new(),
column: segment.fast_fields().i64("doc_id").unwrap(),
})
}
fn requires_scoring(&self) -> bool {
false
}
fn merge_fruits(
&self,
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> tantivy::Result<Self::Fruit> {
let len: usize = segment_fruits.iter().map(|docset| docset.len()).sum();
let mut result = Vec::with_capacity(len);
for docs in segment_fruits {
for doc in docs {
result.push(doc);
}
}
Ok(result)
}
}
pub(crate) struct DocIdChildCollector {
docs: Vec<u32>,
column: Column<i64>,
}
impl SegmentCollector for DocIdChildCollector {
type Fruit = Vec<u32>;
fn collect(&mut self, doc: DocId, _score: Score) {
self.column.values_for_doc(doc).for_each(|doc_id| {
self.docs.push(doc_id as u32);
})
}
fn harvest(self) -> Self::Fruit {
self.docs
}
}

View File

@ -1,62 +1,82 @@
use std::ops::Bound;
use std::str::FromStr;
use std::sync::Arc;
use tantivy::directory::MmapDirectory;
use tantivy::query::{Query, RangeQuery, RegexQuery, TermQuery};
use tantivy::schema::{Field, IndexRecordOption};
use tantivy::{Index, IndexReader, ReloadPolicy, Term};
use crate::docid_collector::DocIdCollector;
use crate::log::init_log;
use crate::util::make_bounds;
use crate::vec_collector::VecCollector;
pub struct IndexReaderWrapper {
pub field_name: String,
pub field: Field,
pub reader: IndexReader,
pub cnt: u32,
pub(crate) struct IndexReaderWrapper {
pub(crate) field_name: String,
pub(crate) field: Field,
pub(crate) reader: IndexReader,
pub(crate) index: Arc<Index>,
pub(crate) id_field: Option<Field>,
}
impl IndexReaderWrapper {
pub fn new(index: &Index, field_name: &String, field: Field) -> IndexReaderWrapper {
pub fn load(path: &str) -> IndexReaderWrapper {
init_log();
let index = Index::open_in_dir(path).unwrap();
IndexReaderWrapper::from_index(Arc::new(index))
}
pub fn from_index(index: Arc<Index>) -> IndexReaderWrapper {
let field = index.schema().fields().next().unwrap().0;
let schema = index.schema();
let field_name = String::from(schema.get_field_name(field));
let id_field: Option<Field> = match schema.get_field("doc_id") {
Ok(field) => Some(field),
Err(_) => None,
};
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.reload_policy(ReloadPolicy::OnCommit) // OnCommit serve for growing segment.
.try_into()
.unwrap();
let metas = index.searchable_segment_metas().unwrap();
reader.reload().unwrap();
IndexReaderWrapper {
field_name,
field,
reader,
index,
id_field,
}
}
pub fn reload(&self) {
self.reader.reload().unwrap();
}
pub fn count(&self) -> u32 {
let metas = self.index.searchable_segment_metas().unwrap();
let mut sum: u32 = 0;
for meta in metas {
sum += meta.max_doc();
}
reader.reload().unwrap();
IndexReaderWrapper {
field_name: field_name.to_string(),
field,
reader,
cnt: sum,
}
sum
}
pub fn load(path: &str) -> IndexReaderWrapper {
let dir = MmapDirectory::open(path).unwrap();
let index = Index::open(dir).unwrap();
let field = index.schema().fields().next().unwrap().0;
let schema = index.schema();
let field_name = schema.get_field_name(field);
IndexReaderWrapper::new(&index, &String::from_str(field_name).unwrap(), field)
}
pub fn count(&self) -> u32 {
self.cnt
}
fn search(&self, q: &dyn Query) -> Vec<u32> {
pub(crate) fn search(&self, q: &dyn Query) -> Vec<u32> {
let searcher = self.reader.searcher();
let hits = searcher.search(q, &VecCollector).unwrap();
hits
match self.id_field {
Some(_) => {
// newer version with doc_id.
searcher.search(q, &DocIdCollector {}).unwrap()
}
None => {
// older version without doc_id, only one segment.
searcher.search(q, &VecCollector {}).unwrap()
}
}
}
pub fn term_query_i64(&self, term: i64) -> Vec<u32> {

View File

@ -1,23 +1,33 @@
use std::ffi::CStr;
use std::sync::Arc;
use futures::executor::block_on;
use libc::c_char;
use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, INDEXED};
use tantivy::{doc, tokenizer, Index, SingleSegmentIndexWriter, Document};
use tantivy::schema::{
Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, FAST, INDEXED,
};
use tantivy::{doc, tokenizer, Document, Index, IndexWriter};
use crate::data_type::TantivyDataType;
use crate::index_reader::IndexReaderWrapper;
use crate::log::init_log;
pub struct IndexWriterWrapper {
pub field_name: String,
pub field: Field,
pub data_type: TantivyDataType,
pub path: String,
pub index_writer: SingleSegmentIndexWriter,
pub(crate) struct IndexWriterWrapper {
pub(crate) field: Field,
pub(crate) index_writer: IndexWriter,
pub(crate) id_field: Field,
pub(crate) index: Arc<Index>,
}
impl IndexWriterWrapper {
pub fn new(field_name: String, data_type: TantivyDataType, path: String) -> IndexWriterWrapper {
pub fn new(
field_name: String,
data_type: TantivyDataType,
path: String,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> IndexWriterWrapper {
init_log();
let field: Field;
@ -42,6 +52,7 @@ impl IndexWriterWrapper {
use_raw_tokenizer = true;
}
}
let id_field = schema_builder.add_i64_field("doc_id", FAST);
let schema = schema_builder.build();
let index = Index::create_in_dir(path.clone(), schema).unwrap();
if use_raw_tokenizer {
@ -49,126 +60,166 @@ impl IndexWriterWrapper {
.tokenizers()
.register("raw_tokenizer", tokenizer::RawTokenizer::default());
}
let index_writer = SingleSegmentIndexWriter::new(index, 15 * 1024 * 1024).unwrap();
let index_writer = index
.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)
.unwrap();
IndexWriterWrapper {
field_name,
field,
data_type,
path,
index_writer,
id_field,
index: Arc::new(index),
}
}
pub fn add_i8(&mut self, data: i8) {
self.add_i64(data.into())
pub fn add_i8(&mut self, data: i8, offset: i64) {
self.add_i64(data.into(), offset)
}
pub fn add_i16(&mut self, data: i16) {
self.add_i64(data.into())
pub fn add_i16(&mut self, data: i16, offset: i64) {
self.add_i64(data.into(), offset)
}
pub fn add_i32(&mut self, data: i32) {
self.add_i64(data.into())
pub fn add_i32(&mut self, data: i32, offset: i64) {
self.add_i64(data.into(), offset)
}
pub fn add_i64(&mut self, data: i64) {
pub fn add_i64(&mut self, data: i64, offset: i64) {
self.index_writer
.add_document(doc!(self.field => data))
.add_document(doc!(
self.field => data,
self.id_field => offset,
))
.unwrap();
}
pub fn add_f32(&mut self, data: f32) {
self.add_f64(data.into())
pub fn add_f32(&mut self, data: f32, offset: i64) {
self.add_f64(data.into(), offset)
}
pub fn add_f64(&mut self, data: f64) {
pub fn add_f64(&mut self, data: f64, offset: i64) {
self.index_writer
.add_document(doc!(self.field => data))
.add_document(doc!(
self.field => data,
self.id_field => offset,
))
.unwrap();
}
pub fn add_bool(&mut self, data: bool) {
pub fn add_bool(&mut self, data: bool, offset: i64) {
self.index_writer
.add_document(doc!(self.field => data))
.add_document(doc!(
self.field => data,
self.id_field => offset,
))
.unwrap();
}
pub fn add_keyword(&mut self, data: &str) {
pub fn add_string(&mut self, data: &str, offset: i64) {
self.index_writer
.add_document(doc!(self.field => data))
.add_document(doc!(
self.field => data,
self.id_field => offset,
))
.unwrap();
}
pub fn add_multi_i8s(&mut self, datas: &[i8]) {
pub fn add_multi_i8s(&mut self, datas: &[i8], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_i16s(&mut self, datas: &[i16]) {
pub fn add_multi_i16s(&mut self, datas: &[i16], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_i32s(&mut self, datas: &[i32]) {
pub fn add_multi_i32s(&mut self, datas: &[i32], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_i64s(&mut self, datas: &[i64]) {
pub fn add_multi_i64s(&mut self, datas: &[i64], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_f32s(&mut self, datas: &[f32]) {
pub fn add_multi_f32s(&mut self, datas: &[f32], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as f64);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_f64s(&mut self, datas: &[f64]) {
pub fn add_multi_f64s(&mut self, datas: &[f64], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_bools(&mut self, datas: &[bool]) {
pub fn add_multi_bools(&mut self, datas: &[bool], offset: i64) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data);
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_keywords(&mut self, datas: &[*const c_char]) {
pub fn add_multi_keywords(&mut self, datas: &[*const c_char], offset: i64) {
let mut document = Document::default();
for element in datas {
let data = unsafe {
CStr::from_ptr(*element)
};
let data = unsafe { CStr::from_ptr(*element) };
document.add_field_value(self.field, data.to_str().unwrap());
}
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document).unwrap();
}
pub fn finish(self) {
self.index_writer
.finalize()
.expect("failed to build inverted index");
fn manual_merge(&mut self) {
let metas = self
.index_writer
.index()
.searchable_segment_metas()
.unwrap();
let policy = self.index_writer.get_merge_policy();
let candidates = policy.compute_merge_candidates(metas.as_slice());
for candidate in candidates {
self.index_writer
.merge(candidate.0.as_slice())
.wait()
.unwrap();
}
}
pub fn finish(mut self) {
self.index_writer.commit().unwrap();
// self.manual_merge();
block_on(self.index_writer.garbage_collect_files()).unwrap();
self.index_writer.wait_merging_threads().unwrap();
}
pub(crate) fn commit(&mut self) {
self.index_writer.commit().unwrap();
}
}

View File

@ -12,6 +12,8 @@ pub extern "C" fn tantivy_create_index(
field_name: *const c_char,
data_type: TantivyDataType,
path: *const c_char,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> *mut c_void {
let field_name_str = unsafe { CStr::from_ptr(field_name) };
let path_str = unsafe { CStr::from_ptr(path) };
@ -19,6 +21,8 @@ pub extern "C" fn tantivy_create_index(
String::from(field_name_str.to_str().unwrap()),
data_type,
String::from(path_str.to_str().unwrap()),
num_threads,
overall_memory_budget_in_bytes,
);
create_binding(wrapper)
}
@ -38,78 +42,113 @@ pub extern "C" fn tantivy_finish_index(ptr: *mut c_void) {
// -------------------------build--------------------
#[no_mangle]
pub extern "C" fn tantivy_index_add_int8s(ptr: *mut c_void, array: *const i8, len: usize) {
pub extern "C" fn tantivy_index_add_int8s(
ptr: *mut c_void,
array: *const i8,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_i8(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_i8(*data, offset_begin + (index as i64));
}
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_int16s(ptr: *mut c_void, array: *const i16, len: usize) {
pub extern "C" fn tantivy_index_add_int16s(
ptr: *mut c_void,
array: *const i16,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_i16(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_i16(*data, offset_begin + (index as i64));
}
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_int32s(ptr: *mut c_void, array: *const i32, len: usize) {
pub extern "C" fn tantivy_index_add_int32s(
ptr: *mut c_void,
array: *const i32,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_i32(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_i32(*data, offset_begin + (index as i64));
}
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_int64s(ptr: *mut c_void, array: *const i64, len: usize) {
pub extern "C" fn tantivy_index_add_int64s(
ptr: *mut c_void,
array: *const i64,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_i64(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_i64(*data, offset_begin + (index as i64));
}
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_f32s(ptr: *mut c_void, array: *const f32, len: usize) {
pub extern "C" fn tantivy_index_add_f32s(
ptr: *mut c_void,
array: *const f32,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_f32(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_f32(*data, offset_begin + (index as i64));
}
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_f64s(ptr: *mut c_void, array: *const f64, len: usize) {
pub extern "C" fn tantivy_index_add_f64s(
ptr: *mut c_void,
array: *const f64,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_f64(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_f64(*data, offset_begin + (index as i64));
}
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_bools(ptr: *mut c_void, array: *const bool, len: usize) {
pub extern "C" fn tantivy_index_add_bools(
ptr: *mut c_void,
array: *const bool,
len: usize,
offset_begin: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
for data in arr {
(*real).add_bool(*data);
for (index, data) in arr.iter().enumerate() {
(*real).add_bool(*data, offset_begin + (index as i64));
}
}
}
@ -117,82 +156,122 @@ pub extern "C" fn tantivy_index_add_bools(ptr: *mut c_void, array: *const bool,
// TODO: this is not a very efficient way, since we must call this function many times, which
// will bring a lot of overhead caused by the rust binding.
#[no_mangle]
pub extern "C" fn tantivy_index_add_keyword(ptr: *mut c_void, s: *const c_char) {
pub extern "C" fn tantivy_index_add_string(ptr: *mut c_void, s: *const c_char, offset: i64) {
let real = ptr as *mut IndexWriterWrapper;
let c_str = unsafe { CStr::from_ptr(s) };
unsafe { (*real).add_keyword(c_str.to_str().unwrap()) }
unsafe { (*real).add_string(c_str.to_str().unwrap(), offset) }
}
// --------------------------------------------- array ------------------------------------------
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int8s(ptr: *mut c_void, array: *const i8, len: usize) {
pub extern "C" fn tantivy_index_add_multi_int8s(
ptr: *mut c_void,
array: *const i8,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_i8s(arr)
(*real).add_multi_i8s(arr, offset)
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int16s(ptr: *mut c_void, array: *const i16, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_i16s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int32s(ptr: *mut c_void, array: *const i32, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_i32s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int64s(ptr: *mut c_void, array: *const i64, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_i64s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_f32s(ptr: *mut c_void, array: *const f32, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_f32s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_f64s(ptr: *mut c_void, array: *const f64, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_f64s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_bools(ptr: *mut c_void, array: *const bool, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_bools(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_keywords(ptr: *mut c_void, array: *const *const c_char, len: usize) {
pub extern "C" fn tantivy_index_add_multi_int16s(
ptr: *mut c_void,
array: *const i16,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_keywords(arr)
(*real).add_multi_i16s(arr, offset);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int32s(
ptr: *mut c_void,
array: *const i32,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_i32s(arr, offset);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int64s(
ptr: *mut c_void,
array: *const i64,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_i64s(arr, offset);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_f32s(
ptr: *mut c_void,
array: *const f32,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_f32s(arr, offset);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_f64s(
ptr: *mut c_void,
array: *const f64,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_f64s(arr, offset);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_bools(
ptr: *mut c_void,
array: *const bool,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_bools(arr, offset);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_keywords(
ptr: *mut c_void,
array: *const *const c_char,
len: usize,
offset: i64,
) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_keywords(arr, offset)
}
}

View File

@ -1,5 +1,7 @@
mod array;
mod data_type;
mod demo_c;
mod docid_collector;
mod hashset_collector;
mod index_reader;
mod index_reader_c;
@ -10,7 +12,6 @@ mod log;
mod util;
mod util_c;
mod vec_collector;
mod demo_c;
pub fn add(left: usize, right: usize) -> usize {
left + right

View File

@ -1,66 +1,18 @@
#include <assert.h>
#include <sstream>
#include <fmt/format.h>
#include <set>
#include <iostream>
#include <map>
#include "tantivy-binding.h"
#include "rust-binding.h"
#include "rust-array.h"
namespace milvus::tantivy {
struct RustArrayWrapper {
explicit RustArrayWrapper(RustArray array) : array_(array) {
}
RustArrayWrapper(RustArrayWrapper&) = delete;
RustArrayWrapper&
operator=(RustArrayWrapper&) = delete;
RustArrayWrapper(RustArrayWrapper&& other) noexcept {
array_.array = other.array_.array;
array_.len = other.array_.len;
array_.cap = other.array_.cap;
other.array_.array = nullptr;
other.array_.len = 0;
other.array_.cap = 0;
}
RustArrayWrapper&
operator=(RustArrayWrapper&& other) noexcept {
if (this != &other) {
free();
array_.array = other.array_.array;
array_.len = other.array_.len;
array_.cap = other.array_.cap;
other.array_.array = nullptr;
other.array_.len = 0;
other.array_.cap = 0;
}
return *this;
}
~RustArrayWrapper() {
free();
}
void
debug() {
std::stringstream ss;
ss << "[ ";
for (int i = 0; i < array_.len; i++) {
ss << array_.array[i] << " ";
}
ss << "]";
std::cout << ss.str() << std::endl;
}
RustArray array_;
private:
void
free() {
if (array_.array != nullptr) {
free_rust_array(array_);
}
}
};
static constexpr uintptr_t DEFAULT_NUM_THREADS = 4;
static constexpr uintptr_t DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES =
DEFAULT_NUM_THREADS * 15 * 1024 * 1024;
template <typename T>
inline TantivyDataType
@ -81,15 +33,14 @@ guess_data_type() {
typeid(T).name());
}
// TODO: should split this into IndexWriter & IndexReader.
struct TantivyIndexWrapper {
using IndexWriter = void*;
using IndexReader = void*;
TantivyIndexWrapper() = default;
NO_COPY_OR_ASSIGN(TantivyIndexWrapper);
TantivyIndexWrapper(TantivyIndexWrapper&) = delete;
TantivyIndexWrapper&
operator=(TantivyIndexWrapper&) = delete;
TantivyIndexWrapper() = default;
TantivyIndexWrapper(TantivyIndexWrapper&& other) noexcept {
writer_ = other.writer_;
@ -120,11 +71,19 @@ struct TantivyIndexWrapper {
TantivyIndexWrapper(const char* field_name,
TantivyDataType data_type,
const char* path) {
writer_ = tantivy_create_index(field_name, data_type, path);
const char* path,
uintptr_t num_threads = DEFAULT_NUM_THREADS,
uintptr_t overall_memory_budget_in_bytes =
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) {
writer_ = tantivy_create_index(field_name,
data_type,
path,
num_threads,
overall_memory_budget_in_bytes);
path_ = std::string(path);
}
// load index. create index reader.
explicit TantivyIndexWrapper(const char* path) {
assert(tantivy_index_exist(path));
reader_ = tantivy_load_index(path);
@ -137,49 +96,51 @@ struct TantivyIndexWrapper {
template <typename T>
void
add_data(const T* array, uintptr_t len) {
add_data(const T* array, uintptr_t len, int64_t offset_begin) {
assert(!finished_);
if constexpr (std::is_same_v<T, bool>) {
tantivy_index_add_bools(writer_, array, len);
tantivy_index_add_bools(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, int8_t>) {
tantivy_index_add_int8s(writer_, array, len);
tantivy_index_add_int8s(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, int16_t>) {
tantivy_index_add_int16s(writer_, array, len);
tantivy_index_add_int16s(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, int32_t>) {
tantivy_index_add_int32s(writer_, array, len);
tantivy_index_add_int32s(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, int64_t>) {
tantivy_index_add_int64s(writer_, array, len);
tantivy_index_add_int64s(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, float>) {
tantivy_index_add_f32s(writer_, array, len);
tantivy_index_add_f32s(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, double>) {
tantivy_index_add_f64s(writer_, array, len);
tantivy_index_add_f64s(writer_, array, len, offset_begin);
return;
}
if constexpr (std::is_same_v<T, std::string>) {
// TODO: not very efficient, a lot of overhead due to rust-ffi call.
for (uintptr_t i = 0; i < len; i++) {
tantivy_index_add_keyword(
writer_, static_cast<const std::string*>(array)[i].c_str());
tantivy_index_add_string(
writer_,
static_cast<const std::string*>(array)[i].c_str(),
offset_begin + i);
}
return;
}
@ -190,41 +151,41 @@ struct TantivyIndexWrapper {
template <typename T>
void
add_multi_data(const T* array, uintptr_t len) {
add_multi_data(const T* array, uintptr_t len, int64_t offset) {
assert(!finished_);
if constexpr (std::is_same_v<T, bool>) {
tantivy_index_add_multi_bools(writer_, array, len);
tantivy_index_add_multi_bools(writer_, array, len, offset);
return;
}
if constexpr (std::is_same_v<T, int8_t>) {
tantivy_index_add_multi_int8s(writer_, array, len);
tantivy_index_add_multi_int8s(writer_, array, len, offset);
return;
}
if constexpr (std::is_same_v<T, int16_t>) {
tantivy_index_add_multi_int16s(writer_, array, len);
tantivy_index_add_multi_int16s(writer_, array, len, offset);
return;
}
if constexpr (std::is_same_v<T, int32_t>) {
tantivy_index_add_multi_int32s(writer_, array, len);
tantivy_index_add_multi_int32s(writer_, array, len, offset);
return;
}
if constexpr (std::is_same_v<T, int64_t>) {
tantivy_index_add_multi_int64s(writer_, array, len);
tantivy_index_add_multi_int64s(writer_, array, len, offset);
return;
}
if constexpr (std::is_same_v<T, float>) {
tantivy_index_add_multi_f32s(writer_, array, len);
tantivy_index_add_multi_f32s(writer_, array, len, offset);
return;
}
if constexpr (std::is_same_v<T, double>) {
tantivy_index_add_multi_f64s(writer_, array, len);
tantivy_index_add_multi_f64s(writer_, array, len, offset);
return;
}
@ -233,7 +194,8 @@ struct TantivyIndexWrapper {
for (uintptr_t i = 0; i < len; i++) {
views.push_back(array[i].c_str());
}
tantivy_index_add_multi_keywords(writer_, views.data(), len);
tantivy_index_add_multi_keywords(
writer_, views.data(), len, offset);
return;
}
@ -244,12 +206,14 @@ struct TantivyIndexWrapper {
inline void
finish() {
if (!finished_) {
tantivy_finish_index(writer_);
writer_ = nullptr;
reader_ = tantivy_load_index(path_.c_str());
finished_ = true;
if (finished_) {
return;
}
tantivy_finish_index(writer_);
writer_ = nullptr;
reader_ = tantivy_load_index(path_.c_str());
finished_ = true;
}
inline uint32_t

View File

@ -33,7 +33,7 @@ run() {
T arr[] = {1, 2, 3, 4, 5, 6};
auto l = sizeof(arr) / sizeof(T);
w.add_data(arr, l);
w.add_data(arr, l, 0);
w.finish();
@ -83,7 +83,7 @@ run<bool>() {
bool arr[] = {true, false, false, true, false, true};
auto l = sizeof(arr) / sizeof(bool);
w.add_data(arr, l);
w.add_data(arr, l, 0);
w.finish();
@ -118,7 +118,7 @@ run<std::string>() {
std::vector<std::string> arr = {"a", "b", "aaa", "abbb"};
auto l = arr.size();
w.add_data<std::string>(arr.data(), l);
w.add_data<std::string>(arr.data(), l, 0);
w.finish();
@ -188,7 +188,7 @@ test_32717() {
inverted[n].insert(i);
}
w.add_data(arr.data(), l);
w.add_data(arr.data(), l, 0);
w.finish();
assert(w.count() == l);
@ -233,8 +233,9 @@ test_array_int() {
{10, 50, 60},
};
int64_t offset = 0;
for (const auto& arr : vec_of_array) {
w.add_multi_data(arr.data(), arr.size());
w.add_multi_data(arr.data(), arr.size(), offset++);
}
w.finish();
@ -263,8 +264,9 @@ test_array_string() {
{"10", "50", "60"},
};
int64_t offset = 0;
for (const auto& arr : vec_of_array) {
w.add_multi_data(arr.data(), arr.size());
w.add_multi_data(arr.data(), arr.size(), offset++);
}
w.finish();

View File

@ -230,14 +230,16 @@ class ArrayBitmapIndexTest : public testing::Test {
config["insert_files"] = std::vector<std::string>{log_path};
config["bitmap_cardinality_limit"] = "100";
auto build_index =
indexbuilder::IndexFactory::GetInstance().CreateIndex(
DataType::ARRAY, config, ctx);
build_index->Build();
{
auto build_index =
indexbuilder::IndexFactory::GetInstance().CreateIndex(
DataType::ARRAY, config, ctx);
build_index->Build();
auto binary_set = build_index->Upload();
for (const auto& [key, _] : binary_set.binary_map_) {
index_files.push_back(key);
auto binary_set = build_index->Upload();
for (const auto& [key, _] : binary_set.binary_map_) {
index_files.push_back(key);
}
}
index::CreateIndexInfo index_info{};

View File

@ -143,14 +143,16 @@ class HybridIndexTestV1 : public testing::Test {
config["insert_files"] = std::vector<std::string>{log_path};
config["bitmap_cardinality_limit"] = "1000";
auto build_index =
indexbuilder::IndexFactory::GetInstance().CreateIndex(
type_, config, ctx);
build_index->Build();
{
auto build_index =
indexbuilder::IndexFactory::GetInstance().CreateIndex(
type_, config, ctx);
build_index->Build();
auto binary_set = build_index->Upload();
for (const auto& [key, _] : binary_set.binary_map_) {
index_files.push_back(key);
auto binary_set = build_index->Upload();
for (const auto& [key, _] : binary_set.binary_map_) {
index_files.push_back(key);
}
}
index::CreateIndexInfo index_info{};