diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 994c8b0c05..b2c4236514 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -420,10 +420,10 @@ InvertedIndexTantivy::BuildWithRawData(size_t n, // only used in ut. auto arr = static_cast*>(values); for (size_t i = 0; i < n; i++) { - wrapper_->template add_multi_data(arr[i].data(), arr[i].size()); + wrapper_->template add_multi_data(arr[i].data(), arr[i].size(), i); } } else { - wrapper_->add_data(static_cast(values), n); + wrapper_->add_data(static_cast(values), n, 0); } finish(); } @@ -449,20 +449,27 @@ InvertedIndexTantivy::BuildWithFieldData( case proto::schema::DataType::Double: case proto::schema::DataType::String: case proto::schema::DataType::VarChar: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - if (schema_.nullable()) { + int64_t offset = 0; + if (schema_.nullable()) { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); for (int i = 0; i < n; i++) { if (!data->is_valid(i)) { null_offset.push_back(i); } wrapper_->add_multi_data( static_cast(data->RawValue(i)), - data->is_valid(i)); + data->is_valid(i), + offset++); } - continue; } - wrapper_->add_data(static_cast(data->Data()), n); + } else { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + wrapper_->add_data( + static_cast(data->Data()), n, offset); + offset += n; + } } break; } @@ -483,6 +490,7 @@ template void InvertedIndexTantivy::build_index_for_array( const std::vector>& field_datas) { + int64_t offset = 0; for (const auto& data : field_datas) { auto n = data->get_num_rows(); auto array_column = static_cast(data->Data()); @@ -494,7 +502,9 @@ InvertedIndexTantivy::build_index_for_array( } auto length = data->is_valid(i) ? array_column[i].length() : 0; wrapper_->template add_multi_data( - reinterpret_cast(array_column[i].data()), length); + reinterpret_cast(array_column[i].data()), + length, + offset++); } } } @@ -503,6 +513,7 @@ template <> void InvertedIndexTantivy::build_index_for_array( const std::vector>& field_datas) { + int64_t offset = 0; for (const auto& data : field_datas) { auto n = data->get_num_rows(); auto array_column = static_cast(data->Data()); @@ -519,7 +530,7 @@ InvertedIndexTantivy::build_index_for_array( array_column[i].template get_data(j)); } auto length = data->is_valid(i) ? output.size() : 0; - wrapper_->template add_multi_data(output.data(), length); + wrapper_->template add_multi_data(output.data(), length, offset++); } } } diff --git a/internal/core/thirdparty/tantivy/CMakeLists.txt b/internal/core/thirdparty/tantivy/CMakeLists.txt index 3963647eee..46371c9dc7 100644 --- a/internal/core/thirdparty/tantivy/CMakeLists.txt +++ b/internal/core/thirdparty/tantivy/CMakeLists.txt @@ -34,7 +34,7 @@ add_custom_target(tantivy_binding_target DEPENDS compile_tantivy ls_cargo_target set(INSTALL_COMMAND cp ${LIB_HEADER_FOLDER}/tantivy-binding.h ${TANTIVY_INCLUDE_DIR}/ && - cp ${CMAKE_CURRENT_SOURCE_DIR}/tantivy-wrapper.h ${TANTIVY_INCLUDE_DIR}/ && + cp ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${TANTIVY_INCLUDE_DIR}/ && cp ${LIB_FILE} ${TANTIVY_LIB_DIR}/) add_custom_command(OUTPUT install_tantivy COMMENT "Install tantivy target ${LIB_FILE} to ${TANTIVY_LIB_DIR}" diff --git a/internal/core/thirdparty/tantivy/bench.cpp b/internal/core/thirdparty/tantivy/bench.cpp index 8b8defd403..c8136f7993 100644 --- a/internal/core/thirdparty/tantivy/bench.cpp +++ b/internal/core/thirdparty/tantivy/bench.cpp @@ -29,7 +29,7 @@ build_index(size_t n = 1000000) { arr.push_back(std::to_string(x)); } - w.add_data(arr.data(), arr.size()); + w.add_data(arr.data(), arr.size(), 0); w.finish(); assert(w.count() == n); diff --git a/internal/core/thirdparty/tantivy/rust-array.h b/internal/core/thirdparty/tantivy/rust-array.h new file mode 100644 index 0000000000..ba9baecc1f --- /dev/null +++ b/internal/core/thirdparty/tantivy/rust-array.h @@ -0,0 +1,65 @@ +#pragma once + +#include +#include + +#include "tantivy-binding.h" +#include "rust-binding.h" + +namespace milvus::tantivy { + +struct RustArrayWrapper { + NO_COPY_OR_ASSIGN(RustArrayWrapper); + + explicit RustArrayWrapper(RustArray array) : array_(array) { + } + + RustArrayWrapper(RustArrayWrapper&& other) noexcept { + array_.array = other.array_.array; + array_.len = other.array_.len; + array_.cap = other.array_.cap; + other.array_.array = nullptr; + other.array_.len = 0; + other.array_.cap = 0; + } + + RustArrayWrapper& + operator=(RustArrayWrapper&& other) noexcept { + if (this != &other) { + free(); + array_.array = other.array_.array; + array_.len = other.array_.len; + array_.cap = other.array_.cap; + other.array_.array = nullptr; + other.array_.len = 0; + other.array_.cap = 0; + } + return *this; + } + + ~RustArrayWrapper() { + free(); + } + + void + debug() { + std::stringstream ss; + ss << "[ "; + for (int i = 0; i < array_.len; i++) { + ss << array_.array[i] << " "; + } + ss << "]"; + std::cout << ss.str() << std::endl; + } + + RustArray array_; + + private: + void + free() { + if (array_.array != nullptr) { + free_rust_array(array_); + } + } +}; +} // namespace milvus::tantivy diff --git a/internal/core/thirdparty/tantivy/rust-binding.h b/internal/core/thirdparty/tantivy/rust-binding.h new file mode 100644 index 0000000000..b5001c7735 --- /dev/null +++ b/internal/core/thirdparty/tantivy/rust-binding.h @@ -0,0 +1,7 @@ +#pragma once + +namespace milvus::tantivy { +#define NO_COPY_OR_ASSIGN(ClassName) \ + ClassName(const ClassName&) = delete; \ + ClassName& operator=(const ClassName&) = delete; +} // namespace milvus::tantivy diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index 045d4a50e6..782adeb346 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -23,6 +23,8 @@ extern "C" { void free_rust_array(RustArray array); +void print_vector_of_strings(const char *const *ptr, uintptr_t len); + void *tantivy_load_index(const char *path); void tantivy_free_index_reader(void *ptr); @@ -75,46 +77,51 @@ RustArray tantivy_prefix_query_keyword(void *ptr, const char *prefix); RustArray tantivy_regex_query(void *ptr, const char *pattern); -void *tantivy_create_index(const char *field_name, TantivyDataType data_type, const char *path); +void *tantivy_create_index(const char *field_name, + TantivyDataType data_type, + const char *path, + uintptr_t num_threads, + uintptr_t overall_memory_budget_in_bytes); void tantivy_free_index_writer(void *ptr); void tantivy_finish_index(void *ptr); -void tantivy_index_add_int8s(void *ptr, const int8_t *array, uintptr_t len); +void tantivy_index_add_int8s(void *ptr, const int8_t *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_int16s(void *ptr, const int16_t *array, uintptr_t len); +void tantivy_index_add_int16s(void *ptr, const int16_t *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_int32s(void *ptr, const int32_t *array, uintptr_t len); +void tantivy_index_add_int32s(void *ptr, const int32_t *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_int64s(void *ptr, const int64_t *array, uintptr_t len); +void tantivy_index_add_int64s(void *ptr, const int64_t *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_f32s(void *ptr, const float *array, uintptr_t len); +void tantivy_index_add_f32s(void *ptr, const float *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_f64s(void *ptr, const double *array, uintptr_t len); +void tantivy_index_add_f64s(void *ptr, const double *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_bools(void *ptr, const bool *array, uintptr_t len); +void tantivy_index_add_bools(void *ptr, const bool *array, uintptr_t len, int64_t offset_begin); -void tantivy_index_add_keyword(void *ptr, const char *s); +void tantivy_index_add_string(void *ptr, const char *s, int64_t offset); -void tantivy_index_add_multi_int8s(void *ptr, const int8_t *array, uintptr_t len); +void tantivy_index_add_multi_int8s(void *ptr, const int8_t *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_int16s(void *ptr, const int16_t *array, uintptr_t len); +void tantivy_index_add_multi_int16s(void *ptr, const int16_t *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_int32s(void *ptr, const int32_t *array, uintptr_t len); +void tantivy_index_add_multi_int32s(void *ptr, const int32_t *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_int64s(void *ptr, const int64_t *array, uintptr_t len); +void tantivy_index_add_multi_int64s(void *ptr, const int64_t *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_f32s(void *ptr, const float *array, uintptr_t len); +void tantivy_index_add_multi_f32s(void *ptr, const float *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_f64s(void *ptr, const double *array, uintptr_t len); +void tantivy_index_add_multi_f64s(void *ptr, const double *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_bools(void *ptr, const bool *array, uintptr_t len); +void tantivy_index_add_multi_bools(void *ptr, const bool *array, uintptr_t len, int64_t offset); -void tantivy_index_add_multi_keywords(void *ptr, const char *const *array, uintptr_t len); +void tantivy_index_add_multi_keywords(void *ptr, + const char *const *array, + uintptr_t len, + int64_t offset); bool tantivy_index_exist(const char *path); -void print_vector_of_strings(const char *const *ptr, uintptr_t len); - } // extern "C" diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs index 257a41f17a..af4412145d 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs @@ -1,14 +1,13 @@ -use std::{ffi::{c_char, CStr}, slice}; +use std::{ + ffi::{c_char, CStr}, + slice, +}; #[no_mangle] pub extern "C" fn print_vector_of_strings(ptr: *const *const c_char, len: usize) { - let arr : &[*const c_char] = unsafe { - slice::from_raw_parts(ptr, len) - }; - for element in arr { - let c_str = unsafe { - CStr::from_ptr(*element) - }; - println!("{}", c_str.to_str().unwrap()); - } -} \ No newline at end of file + let arr: &[*const c_char] = unsafe { slice::from_raw_parts(ptr, len) }; + for element in arr { + let c_str = unsafe { CStr::from_ptr(*element) }; + println!("{}", c_str.to_str().unwrap()); + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs new file mode 100644 index 0000000000..95d585b436 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs @@ -0,0 +1,60 @@ +use tantivy::{ + collector::{Collector, SegmentCollector}, + fastfield::Column, + DocId, Score, SegmentOrdinal, SegmentReader, +}; + +pub(crate) struct DocIdCollector; + +impl Collector for DocIdCollector { + type Fruit = Vec; + type Child = DocIdChildCollector; + + fn for_segment( + &self, + _segment_local_id: SegmentOrdinal, + segment: &SegmentReader, + ) -> tantivy::Result { + Ok(DocIdChildCollector { + docs: Vec::new(), + column: segment.fast_fields().i64("doc_id").unwrap(), + }) + } + + fn requires_scoring(&self) -> bool { + false + } + + fn merge_fruits( + &self, + segment_fruits: Vec<::Fruit>, + ) -> tantivy::Result { + let len: usize = segment_fruits.iter().map(|docset| docset.len()).sum(); + let mut result = Vec::with_capacity(len); + for docs in segment_fruits { + for doc in docs { + result.push(doc); + } + } + Ok(result) + } +} + +pub(crate) struct DocIdChildCollector { + docs: Vec, + column: Column, +} + +impl SegmentCollector for DocIdChildCollector { + type Fruit = Vec; + + fn collect(&mut self, doc: DocId, _score: Score) { + self.column.values_for_doc(doc).for_each(|doc_id| { + self.docs.push(doc_id as u32); + }) + } + + fn harvest(self) -> Self::Fruit { + self.docs + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs index b00c5ceda9..3ac514759d 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs @@ -1,62 +1,82 @@ use std::ops::Bound; -use std::str::FromStr; +use std::sync::Arc; -use tantivy::directory::MmapDirectory; use tantivy::query::{Query, RangeQuery, RegexQuery, TermQuery}; use tantivy::schema::{Field, IndexRecordOption}; use tantivy::{Index, IndexReader, ReloadPolicy, Term}; +use crate::docid_collector::DocIdCollector; use crate::log::init_log; use crate::util::make_bounds; use crate::vec_collector::VecCollector; -pub struct IndexReaderWrapper { - pub field_name: String, - pub field: Field, - pub reader: IndexReader, - pub cnt: u32, +pub(crate) struct IndexReaderWrapper { + pub(crate) field_name: String, + pub(crate) field: Field, + pub(crate) reader: IndexReader, + pub(crate) index: Arc, + pub(crate) id_field: Option, } impl IndexReaderWrapper { - pub fn new(index: &Index, field_name: &String, field: Field) -> IndexReaderWrapper { + pub fn load(path: &str) -> IndexReaderWrapper { init_log(); + let index = Index::open_in_dir(path).unwrap(); + + IndexReaderWrapper::from_index(Arc::new(index)) + } + + pub fn from_index(index: Arc) -> IndexReaderWrapper { + let field = index.schema().fields().next().unwrap().0; + let schema = index.schema(); + let field_name = String::from(schema.get_field_name(field)); + let id_field: Option = match schema.get_field("doc_id") { + Ok(field) => Some(field), + Err(_) => None, + }; + let reader = index .reader_builder() - .reload_policy(ReloadPolicy::Manual) + .reload_policy(ReloadPolicy::OnCommit) // OnCommit serve for growing segment. .try_into() .unwrap(); - let metas = index.searchable_segment_metas().unwrap(); + reader.reload().unwrap(); + + IndexReaderWrapper { + field_name, + field, + reader, + index, + id_field, + } + } + + pub fn reload(&self) { + self.reader.reload().unwrap(); + } + + pub fn count(&self) -> u32 { + let metas = self.index.searchable_segment_metas().unwrap(); let mut sum: u32 = 0; for meta in metas { sum += meta.max_doc(); } - reader.reload().unwrap(); - IndexReaderWrapper { - field_name: field_name.to_string(), - field, - reader, - cnt: sum, - } + sum } - pub fn load(path: &str) -> IndexReaderWrapper { - let dir = MmapDirectory::open(path).unwrap(); - let index = Index::open(dir).unwrap(); - let field = index.schema().fields().next().unwrap().0; - let schema = index.schema(); - let field_name = schema.get_field_name(field); - IndexReaderWrapper::new(&index, &String::from_str(field_name).unwrap(), field) - } - - pub fn count(&self) -> u32 { - self.cnt - } - - fn search(&self, q: &dyn Query) -> Vec { + pub(crate) fn search(&self, q: &dyn Query) -> Vec { let searcher = self.reader.searcher(); - let hits = searcher.search(q, &VecCollector).unwrap(); - hits + match self.id_field { + Some(_) => { + // newer version with doc_id. + searcher.search(q, &DocIdCollector {}).unwrap() + } + None => { + // older version without doc_id, only one segment. + searcher.search(q, &VecCollector {}).unwrap() + } + } } pub fn term_query_i64(&self, term: i64) -> Vec { diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs index 2c8d56bf38..7c2601df4f 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs @@ -1,23 +1,33 @@ use std::ffi::CStr; +use std::sync::Arc; +use futures::executor::block_on; use libc::c_char; -use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, INDEXED}; -use tantivy::{doc, tokenizer, Index, SingleSegmentIndexWriter, Document}; +use tantivy::schema::{ + Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, FAST, INDEXED, +}; +use tantivy::{doc, tokenizer, Document, Index, IndexWriter}; use crate::data_type::TantivyDataType; +use crate::index_reader::IndexReaderWrapper; use crate::log::init_log; -pub struct IndexWriterWrapper { - pub field_name: String, - pub field: Field, - pub data_type: TantivyDataType, - pub path: String, - pub index_writer: SingleSegmentIndexWriter, +pub(crate) struct IndexWriterWrapper { + pub(crate) field: Field, + pub(crate) index_writer: IndexWriter, + pub(crate) id_field: Field, + pub(crate) index: Arc, } impl IndexWriterWrapper { - pub fn new(field_name: String, data_type: TantivyDataType, path: String) -> IndexWriterWrapper { + pub fn new( + field_name: String, + data_type: TantivyDataType, + path: String, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + ) -> IndexWriterWrapper { init_log(); let field: Field; @@ -42,6 +52,7 @@ impl IndexWriterWrapper { use_raw_tokenizer = true; } } + let id_field = schema_builder.add_i64_field("doc_id", FAST); let schema = schema_builder.build(); let index = Index::create_in_dir(path.clone(), schema).unwrap(); if use_raw_tokenizer { @@ -49,126 +60,166 @@ impl IndexWriterWrapper { .tokenizers() .register("raw_tokenizer", tokenizer::RawTokenizer::default()); } - let index_writer = SingleSegmentIndexWriter::new(index, 15 * 1024 * 1024).unwrap(); + let index_writer = index + .writer_with_num_threads(num_threads, overall_memory_budget_in_bytes) + .unwrap(); IndexWriterWrapper { - field_name, field, - data_type, - path, index_writer, + id_field, + index: Arc::new(index), } } - pub fn add_i8(&mut self, data: i8) { - self.add_i64(data.into()) + pub fn add_i8(&mut self, data: i8, offset: i64) { + self.add_i64(data.into(), offset) } - pub fn add_i16(&mut self, data: i16) { - self.add_i64(data.into()) + pub fn add_i16(&mut self, data: i16, offset: i64) { + self.add_i64(data.into(), offset) } - pub fn add_i32(&mut self, data: i32) { - self.add_i64(data.into()) + pub fn add_i32(&mut self, data: i32, offset: i64) { + self.add_i64(data.into(), offset) } - pub fn add_i64(&mut self, data: i64) { + pub fn add_i64(&mut self, data: i64, offset: i64) { self.index_writer - .add_document(doc!(self.field => data)) + .add_document(doc!( + self.field => data, + self.id_field => offset, + )) .unwrap(); } - pub fn add_f32(&mut self, data: f32) { - self.add_f64(data.into()) + pub fn add_f32(&mut self, data: f32, offset: i64) { + self.add_f64(data.into(), offset) } - pub fn add_f64(&mut self, data: f64) { + pub fn add_f64(&mut self, data: f64, offset: i64) { self.index_writer - .add_document(doc!(self.field => data)) + .add_document(doc!( + self.field => data, + self.id_field => offset, + )) .unwrap(); } - pub fn add_bool(&mut self, data: bool) { + pub fn add_bool(&mut self, data: bool, offset: i64) { self.index_writer - .add_document(doc!(self.field => data)) + .add_document(doc!( + self.field => data, + self.id_field => offset, + )) .unwrap(); } - pub fn add_keyword(&mut self, data: &str) { + pub fn add_string(&mut self, data: &str, offset: i64) { self.index_writer - .add_document(doc!(self.field => data)) + .add_document(doc!( + self.field => data, + self.id_field => offset, + )) .unwrap(); } - pub fn add_multi_i8s(&mut self, datas: &[i8]) { + pub fn add_multi_i8s(&mut self, datas: &[i8], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data as i64); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_i16s(&mut self, datas: &[i16]) { + pub fn add_multi_i16s(&mut self, datas: &[i16], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data as i64); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_i32s(&mut self, datas: &[i32]) { + pub fn add_multi_i32s(&mut self, datas: &[i32], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data as i64); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_i64s(&mut self, datas: &[i64]) { + pub fn add_multi_i64s(&mut self, datas: &[i64], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_f32s(&mut self, datas: &[f32]) { + pub fn add_multi_f32s(&mut self, datas: &[f32], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data as f64); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_f64s(&mut self, datas: &[f64]) { + pub fn add_multi_f64s(&mut self, datas: &[f64], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_bools(&mut self, datas: &[bool]) { + pub fn add_multi_bools(&mut self, datas: &[bool], offset: i64) { let mut document = Document::default(); for data in datas { document.add_field_value(self.field, *data); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn add_multi_keywords(&mut self, datas: &[*const c_char]) { + pub fn add_multi_keywords(&mut self, datas: &[*const c_char], offset: i64) { let mut document = Document::default(); for element in datas { - let data = unsafe { - CStr::from_ptr(*element) - }; + let data = unsafe { CStr::from_ptr(*element) }; document.add_field_value(self.field, data.to_str().unwrap()); } + document.add_i64(self.id_field, offset); self.index_writer.add_document(document).unwrap(); } - pub fn finish(self) { - self.index_writer - .finalize() - .expect("failed to build inverted index"); + fn manual_merge(&mut self) { + let metas = self + .index_writer + .index() + .searchable_segment_metas() + .unwrap(); + let policy = self.index_writer.get_merge_policy(); + let candidates = policy.compute_merge_candidates(metas.as_slice()); + for candidate in candidates { + self.index_writer + .merge(candidate.0.as_slice()) + .wait() + .unwrap(); + } + } + + pub fn finish(mut self) { + self.index_writer.commit().unwrap(); + // self.manual_merge(); + block_on(self.index_writer.garbage_collect_files()).unwrap(); + self.index_writer.wait_merging_threads().unwrap(); + } + + pub(crate) fn commit(&mut self) { + self.index_writer.commit().unwrap(); } } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs index b13f550d7c..7ffd4868e6 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs @@ -12,6 +12,8 @@ pub extern "C" fn tantivy_create_index( field_name: *const c_char, data_type: TantivyDataType, path: *const c_char, + num_threads: usize, + overall_memory_budget_in_bytes: usize, ) -> *mut c_void { let field_name_str = unsafe { CStr::from_ptr(field_name) }; let path_str = unsafe { CStr::from_ptr(path) }; @@ -19,6 +21,8 @@ pub extern "C" fn tantivy_create_index( String::from(field_name_str.to_str().unwrap()), data_type, String::from(path_str.to_str().unwrap()), + num_threads, + overall_memory_budget_in_bytes, ); create_binding(wrapper) } @@ -38,78 +42,113 @@ pub extern "C" fn tantivy_finish_index(ptr: *mut c_void) { // -------------------------build-------------------- #[no_mangle] -pub extern "C" fn tantivy_index_add_int8s(ptr: *mut c_void, array: *const i8, len: usize) { +pub extern "C" fn tantivy_index_add_int8s( + ptr: *mut c_void, + array: *const i8, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_i8(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_i8(*data, offset_begin + (index as i64)); } } } #[no_mangle] -pub extern "C" fn tantivy_index_add_int16s(ptr: *mut c_void, array: *const i16, len: usize) { +pub extern "C" fn tantivy_index_add_int16s( + ptr: *mut c_void, + array: *const i16, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_i16(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_i16(*data, offset_begin + (index as i64)); } } } #[no_mangle] -pub extern "C" fn tantivy_index_add_int32s(ptr: *mut c_void, array: *const i32, len: usize) { +pub extern "C" fn tantivy_index_add_int32s( + ptr: *mut c_void, + array: *const i32, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_i32(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_i32(*data, offset_begin + (index as i64)); } } } #[no_mangle] -pub extern "C" fn tantivy_index_add_int64s(ptr: *mut c_void, array: *const i64, len: usize) { +pub extern "C" fn tantivy_index_add_int64s( + ptr: *mut c_void, + array: *const i64, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_i64(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_i64(*data, offset_begin + (index as i64)); } } } #[no_mangle] -pub extern "C" fn tantivy_index_add_f32s(ptr: *mut c_void, array: *const f32, len: usize) { +pub extern "C" fn tantivy_index_add_f32s( + ptr: *mut c_void, + array: *const f32, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_f32(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_f32(*data, offset_begin + (index as i64)); } } } #[no_mangle] -pub extern "C" fn tantivy_index_add_f64s(ptr: *mut c_void, array: *const f64, len: usize) { +pub extern "C" fn tantivy_index_add_f64s( + ptr: *mut c_void, + array: *const f64, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_f64(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_f64(*data, offset_begin + (index as i64)); } } } #[no_mangle] -pub extern "C" fn tantivy_index_add_bools(ptr: *mut c_void, array: *const bool, len: usize) { +pub extern "C" fn tantivy_index_add_bools( + ptr: *mut c_void, + array: *const bool, + len: usize, + offset_begin: i64, +) { let real = ptr as *mut IndexWriterWrapper; let arr = unsafe { slice::from_raw_parts(array, len) }; unsafe { - for data in arr { - (*real).add_bool(*data); + for (index, data) in arr.iter().enumerate() { + (*real).add_bool(*data, offset_begin + (index as i64)); } } } @@ -117,82 +156,122 @@ pub extern "C" fn tantivy_index_add_bools(ptr: *mut c_void, array: *const bool, // TODO: this is not a very efficient way, since we must call this function many times, which // will bring a lot of overhead caused by the rust binding. #[no_mangle] -pub extern "C" fn tantivy_index_add_keyword(ptr: *mut c_void, s: *const c_char) { +pub extern "C" fn tantivy_index_add_string(ptr: *mut c_void, s: *const c_char, offset: i64) { let real = ptr as *mut IndexWriterWrapper; let c_str = unsafe { CStr::from_ptr(s) }; - unsafe { (*real).add_keyword(c_str.to_str().unwrap()) } + unsafe { (*real).add_string(c_str.to_str().unwrap(), offset) } } // --------------------------------------------- array ------------------------------------------ #[no_mangle] -pub extern "C" fn tantivy_index_add_multi_int8s(ptr: *mut c_void, array: *const i8, len: usize) { +pub extern "C" fn tantivy_index_add_multi_int8s( + ptr: *mut c_void, + array: *const i8, + len: usize, + offset: i64, +) { let real = ptr as *mut IndexWriterWrapper; unsafe { let arr = slice::from_raw_parts(array, len); - (*real).add_multi_i8s(arr) + (*real).add_multi_i8s(arr, offset) } } #[no_mangle] -pub extern "C" fn tantivy_index_add_multi_int16s(ptr: *mut c_void, array: *const i16, len: usize) { - let real = ptr as *mut IndexWriterWrapper; - unsafe { - let arr = slice::from_raw_parts(array, len) ; - (*real).add_multi_i16s(arr); - } -} - -#[no_mangle] -pub extern "C" fn tantivy_index_add_multi_int32s(ptr: *mut c_void, array: *const i32, len: usize) { - let real = ptr as *mut IndexWriterWrapper; - unsafe { - let arr = slice::from_raw_parts(array, len) ; - (*real).add_multi_i32s(arr); - } -} - -#[no_mangle] -pub extern "C" fn tantivy_index_add_multi_int64s(ptr: *mut c_void, array: *const i64, len: usize) { - let real = ptr as *mut IndexWriterWrapper; - unsafe { - let arr = slice::from_raw_parts(array, len) ; - (*real).add_multi_i64s(arr); - } -} - -#[no_mangle] -pub extern "C" fn tantivy_index_add_multi_f32s(ptr: *mut c_void, array: *const f32, len: usize) { - let real = ptr as *mut IndexWriterWrapper; - unsafe { - let arr = slice::from_raw_parts(array, len) ; - (*real).add_multi_f32s(arr); - } -} - -#[no_mangle] -pub extern "C" fn tantivy_index_add_multi_f64s(ptr: *mut c_void, array: *const f64, len: usize) { - let real = ptr as *mut IndexWriterWrapper; - unsafe { - let arr = slice::from_raw_parts(array, len) ; - (*real).add_multi_f64s(arr); - } -} - -#[no_mangle] -pub extern "C" fn tantivy_index_add_multi_bools(ptr: *mut c_void, array: *const bool, len: usize) { - let real = ptr as *mut IndexWriterWrapper; - unsafe { - let arr = slice::from_raw_parts(array, len) ; - (*real).add_multi_bools(arr); - } -} - -#[no_mangle] -pub extern "C" fn tantivy_index_add_multi_keywords(ptr: *mut c_void, array: *const *const c_char, len: usize) { +pub extern "C" fn tantivy_index_add_multi_int16s( + ptr: *mut c_void, + array: *const i16, + len: usize, + offset: i64, +) { let real = ptr as *mut IndexWriterWrapper; unsafe { let arr = slice::from_raw_parts(array, len); - (*real).add_multi_keywords(arr) + (*real).add_multi_i16s(arr, offset); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_int32s( + ptr: *mut c_void, + array: *const i32, + len: usize, + offset: i64, +) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_i32s(arr, offset); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_int64s( + ptr: *mut c_void, + array: *const i64, + len: usize, + offset: i64, +) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_i64s(arr, offset); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_f32s( + ptr: *mut c_void, + array: *const f32, + len: usize, + offset: i64, +) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_f32s(arr, offset); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_f64s( + ptr: *mut c_void, + array: *const f64, + len: usize, + offset: i64, +) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_f64s(arr, offset); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_bools( + ptr: *mut c_void, + array: *const bool, + len: usize, + offset: i64, +) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_bools(arr, offset); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_keywords( + ptr: *mut c_void, + array: *const *const c_char, + len: usize, + offset: i64, +) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_keywords(arr, offset) } } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs index c6193de3f6..36bb21fd92 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs @@ -1,5 +1,7 @@ mod array; mod data_type; +mod demo_c; +mod docid_collector; mod hashset_collector; mod index_reader; mod index_reader_c; @@ -10,7 +12,6 @@ mod log; mod util; mod util_c; mod vec_collector; -mod demo_c; pub fn add(left: usize, right: usize) -> usize { left + right diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index 7574d3875c..796f264dd8 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -1,66 +1,18 @@ +#include #include #include #include #include +#include + #include "tantivy-binding.h" +#include "rust-binding.h" +#include "rust-array.h" namespace milvus::tantivy { -struct RustArrayWrapper { - explicit RustArrayWrapper(RustArray array) : array_(array) { - } - - RustArrayWrapper(RustArrayWrapper&) = delete; - RustArrayWrapper& - operator=(RustArrayWrapper&) = delete; - - RustArrayWrapper(RustArrayWrapper&& other) noexcept { - array_.array = other.array_.array; - array_.len = other.array_.len; - array_.cap = other.array_.cap; - other.array_.array = nullptr; - other.array_.len = 0; - other.array_.cap = 0; - } - - RustArrayWrapper& - operator=(RustArrayWrapper&& other) noexcept { - if (this != &other) { - free(); - array_.array = other.array_.array; - array_.len = other.array_.len; - array_.cap = other.array_.cap; - other.array_.array = nullptr; - other.array_.len = 0; - other.array_.cap = 0; - } - return *this; - } - - ~RustArrayWrapper() { - free(); - } - - void - debug() { - std::stringstream ss; - ss << "[ "; - for (int i = 0; i < array_.len; i++) { - ss << array_.array[i] << " "; - } - ss << "]"; - std::cout << ss.str() << std::endl; - } - - RustArray array_; - - private: - void - free() { - if (array_.array != nullptr) { - free_rust_array(array_); - } - } -}; +static constexpr uintptr_t DEFAULT_NUM_THREADS = 4; +static constexpr uintptr_t DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES = + DEFAULT_NUM_THREADS * 15 * 1024 * 1024; template inline TantivyDataType @@ -81,15 +33,14 @@ guess_data_type() { typeid(T).name()); } +// TODO: should split this into IndexWriter & IndexReader. struct TantivyIndexWrapper { using IndexWriter = void*; using IndexReader = void*; - TantivyIndexWrapper() = default; + NO_COPY_OR_ASSIGN(TantivyIndexWrapper); - TantivyIndexWrapper(TantivyIndexWrapper&) = delete; - TantivyIndexWrapper& - operator=(TantivyIndexWrapper&) = delete; + TantivyIndexWrapper() = default; TantivyIndexWrapper(TantivyIndexWrapper&& other) noexcept { writer_ = other.writer_; @@ -120,11 +71,19 @@ struct TantivyIndexWrapper { TantivyIndexWrapper(const char* field_name, TantivyDataType data_type, - const char* path) { - writer_ = tantivy_create_index(field_name, data_type, path); + const char* path, + uintptr_t num_threads = DEFAULT_NUM_THREADS, + uintptr_t overall_memory_budget_in_bytes = + DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) { + writer_ = tantivy_create_index(field_name, + data_type, + path, + num_threads, + overall_memory_budget_in_bytes); path_ = std::string(path); } + // load index. create index reader. explicit TantivyIndexWrapper(const char* path) { assert(tantivy_index_exist(path)); reader_ = tantivy_load_index(path); @@ -137,49 +96,51 @@ struct TantivyIndexWrapper { template void - add_data(const T* array, uintptr_t len) { + add_data(const T* array, uintptr_t len, int64_t offset_begin) { assert(!finished_); if constexpr (std::is_same_v) { - tantivy_index_add_bools(writer_, array, len); + tantivy_index_add_bools(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { - tantivy_index_add_int8s(writer_, array, len); + tantivy_index_add_int8s(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { - tantivy_index_add_int16s(writer_, array, len); + tantivy_index_add_int16s(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { - tantivy_index_add_int32s(writer_, array, len); + tantivy_index_add_int32s(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { - tantivy_index_add_int64s(writer_, array, len); + tantivy_index_add_int64s(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { - tantivy_index_add_f32s(writer_, array, len); + tantivy_index_add_f32s(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { - tantivy_index_add_f64s(writer_, array, len); + tantivy_index_add_f64s(writer_, array, len, offset_begin); return; } if constexpr (std::is_same_v) { // TODO: not very efficient, a lot of overhead due to rust-ffi call. for (uintptr_t i = 0; i < len; i++) { - tantivy_index_add_keyword( - writer_, static_cast(array)[i].c_str()); + tantivy_index_add_string( + writer_, + static_cast(array)[i].c_str(), + offset_begin + i); } return; } @@ -190,41 +151,41 @@ struct TantivyIndexWrapper { template void - add_multi_data(const T* array, uintptr_t len) { + add_multi_data(const T* array, uintptr_t len, int64_t offset) { assert(!finished_); if constexpr (std::is_same_v) { - tantivy_index_add_multi_bools(writer_, array, len); + tantivy_index_add_multi_bools(writer_, array, len, offset); return; } if constexpr (std::is_same_v) { - tantivy_index_add_multi_int8s(writer_, array, len); + tantivy_index_add_multi_int8s(writer_, array, len, offset); return; } if constexpr (std::is_same_v) { - tantivy_index_add_multi_int16s(writer_, array, len); + tantivy_index_add_multi_int16s(writer_, array, len, offset); return; } if constexpr (std::is_same_v) { - tantivy_index_add_multi_int32s(writer_, array, len); + tantivy_index_add_multi_int32s(writer_, array, len, offset); return; } if constexpr (std::is_same_v) { - tantivy_index_add_multi_int64s(writer_, array, len); + tantivy_index_add_multi_int64s(writer_, array, len, offset); return; } if constexpr (std::is_same_v) { - tantivy_index_add_multi_f32s(writer_, array, len); + tantivy_index_add_multi_f32s(writer_, array, len, offset); return; } if constexpr (std::is_same_v) { - tantivy_index_add_multi_f64s(writer_, array, len); + tantivy_index_add_multi_f64s(writer_, array, len, offset); return; } @@ -233,7 +194,8 @@ struct TantivyIndexWrapper { for (uintptr_t i = 0; i < len; i++) { views.push_back(array[i].c_str()); } - tantivy_index_add_multi_keywords(writer_, views.data(), len); + tantivy_index_add_multi_keywords( + writer_, views.data(), len, offset); return; } @@ -244,12 +206,14 @@ struct TantivyIndexWrapper { inline void finish() { - if (!finished_) { - tantivy_finish_index(writer_); - writer_ = nullptr; - reader_ = tantivy_load_index(path_.c_str()); - finished_ = true; + if (finished_) { + return; } + + tantivy_finish_index(writer_); + writer_ = nullptr; + reader_ = tantivy_load_index(path_.c_str()); + finished_ = true; } inline uint32_t diff --git a/internal/core/thirdparty/tantivy/test.cpp b/internal/core/thirdparty/tantivy/test.cpp index a380481042..4ba283d5ff 100644 --- a/internal/core/thirdparty/tantivy/test.cpp +++ b/internal/core/thirdparty/tantivy/test.cpp @@ -33,7 +33,7 @@ run() { T arr[] = {1, 2, 3, 4, 5, 6}; auto l = sizeof(arr) / sizeof(T); - w.add_data(arr, l); + w.add_data(arr, l, 0); w.finish(); @@ -83,7 +83,7 @@ run() { bool arr[] = {true, false, false, true, false, true}; auto l = sizeof(arr) / sizeof(bool); - w.add_data(arr, l); + w.add_data(arr, l, 0); w.finish(); @@ -118,7 +118,7 @@ run() { std::vector arr = {"a", "b", "aaa", "abbb"}; auto l = arr.size(); - w.add_data(arr.data(), l); + w.add_data(arr.data(), l, 0); w.finish(); @@ -188,7 +188,7 @@ test_32717() { inverted[n].insert(i); } - w.add_data(arr.data(), l); + w.add_data(arr.data(), l, 0); w.finish(); assert(w.count() == l); @@ -233,8 +233,9 @@ test_array_int() { {10, 50, 60}, }; + int64_t offset = 0; for (const auto& arr : vec_of_array) { - w.add_multi_data(arr.data(), arr.size()); + w.add_multi_data(arr.data(), arr.size(), offset++); } w.finish(); @@ -263,8 +264,9 @@ test_array_string() { {"10", "50", "60"}, }; + int64_t offset = 0; for (const auto& arr : vec_of_array) { - w.add_multi_data(arr.data(), arr.size()); + w.add_multi_data(arr.data(), arr.size(), offset++); } w.finish(); diff --git a/internal/core/unittest/test_array_bitmap_index.cpp b/internal/core/unittest/test_array_bitmap_index.cpp index d0a104c5bb..c40652c39b 100644 --- a/internal/core/unittest/test_array_bitmap_index.cpp +++ b/internal/core/unittest/test_array_bitmap_index.cpp @@ -230,14 +230,16 @@ class ArrayBitmapIndexTest : public testing::Test { config["insert_files"] = std::vector{log_path}; config["bitmap_cardinality_limit"] = "100"; - auto build_index = - indexbuilder::IndexFactory::GetInstance().CreateIndex( - DataType::ARRAY, config, ctx); - build_index->Build(); + { + auto build_index = + indexbuilder::IndexFactory::GetInstance().CreateIndex( + DataType::ARRAY, config, ctx); + build_index->Build(); - auto binary_set = build_index->Upload(); - for (const auto& [key, _] : binary_set.binary_map_) { - index_files.push_back(key); + auto binary_set = build_index->Upload(); + for (const auto& [key, _] : binary_set.binary_map_) { + index_files.push_back(key); + } } index::CreateIndexInfo index_info{}; diff --git a/internal/core/unittest/test_hybrid_index.cpp b/internal/core/unittest/test_hybrid_index.cpp index b4e3455bdf..7d9f47942c 100644 --- a/internal/core/unittest/test_hybrid_index.cpp +++ b/internal/core/unittest/test_hybrid_index.cpp @@ -143,14 +143,16 @@ class HybridIndexTestV1 : public testing::Test { config["insert_files"] = std::vector{log_path}; config["bitmap_cardinality_limit"] = "1000"; - auto build_index = - indexbuilder::IndexFactory::GetInstance().CreateIndex( - type_, config, ctx); - build_index->Build(); + { + auto build_index = + indexbuilder::IndexFactory::GetInstance().CreateIndex( + type_, config, ctx); + build_index->Build(); - auto binary_set = build_index->Upload(); - for (const auto& [key, _] : binary_set.binary_map_) { - index_files.push_back(key); + auto binary_set = build_index->Upload(); + for (const auto& [key, _] : binary_set.binary_map_) { + index_files.push_back(key); + } } index::CreateIndexInfo index_info{};