2021-12-10 20:59:42 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:47:10 +08:00
// with the License. You may obtain a copy of the License at
//
2021-12-10 20:59:42 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:47:10 +08:00
//
2021-12-10 20:59:42 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:47:10 +08:00
2021-01-16 10:12:14 +08:00
package querynode
2020-08-25 15:45:19 +08:00
2020-08-29 17:42:41 +08:00
/ *
2020-10-23 18:01:24 +08:00
# cgo CFLAGS : - I $ { SRCDIR } / . . / core / output / include
2020-10-31 15:11:47 +08:00
# cgo LDFLAGS : - L $ { SRCDIR } / . . / core / output / lib - lmilvus_segcore - Wl , - rpath = $ { SRCDIR } / . . / core / output / lib
2020-08-29 17:42:41 +08:00
2020-11-25 10:31:51 +08:00
# include "segcore/collection_c.h"
# include "segcore/plan_c.h"
# include "segcore/reduce_c.h"
2020-08-29 17:42:41 +08:00
* /
2020-08-25 15:45:19 +08:00
import "C"
2020-08-28 17:29:26 +08:00
import (
2021-08-14 11:18:10 +08:00
"bytes"
"encoding/binary"
2021-06-09 11:37:55 +08:00
"errors"
2021-03-22 16:36:10 +08:00
"fmt"
2020-11-04 17:58:43 +08:00
"strconv"
2021-01-12 18:03:24 +08:00
"sync"
2020-11-05 10:52:50 +08:00
"unsafe"
2020-11-04 17:58:43 +08:00
2021-08-28 10:12:00 +08:00
"github.com/bits-and-blooms/bloom/v3"
2021-11-15 11:05:09 +08:00
"github.com/golang/protobuf/proto"
2020-11-09 16:27:11 +08:00
"github.com/stretchr/testify/assert"
2021-03-05 09:21:35 +08:00
"go.uber.org/zap"
2020-11-09 16:27:11 +08:00
2021-11-02 18:16:32 +08:00
"github.com/milvus-io/milvus/internal/common"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
2021-07-16 17:19:55 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2021-08-14 11:18:10 +08:00
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-06-15 14:43:57 +08:00
"github.com/milvus-io/milvus/internal/proto/segcorepb"
2021-08-14 11:18:10 +08:00
"github.com/milvus-io/milvus/internal/storage"
2021-11-23 14:25:15 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2020-08-28 17:29:26 +08:00
)
2021-03-10 22:06:22 +08:00
type segmentType int32
2021-01-20 09:36:50 +08:00
const (
2021-03-10 22:06:22 +08:00
segmentTypeInvalid segmentType = iota
segmentTypeGrowing
segmentTypeSealed
2021-03-22 16:36:10 +08:00
segmentTypeIndexing
2021-01-20 09:36:50 +08:00
)
2021-10-18 20:08:42 +08:00
const (
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
2021-11-10 23:48:06 +08:00
// VectorFieldInfo contains binlog info of vector field
2021-07-16 17:19:55 +08:00
type VectorFieldInfo struct {
2021-07-24 09:25:22 +08:00
fieldBinlog * datapb . FieldBinlog
2021-07-16 17:19:55 +08:00
}
func newVectorFieldInfo ( fieldBinlog * datapb . FieldBinlog ) * VectorFieldInfo {
return & VectorFieldInfo {
2021-07-24 09:25:22 +08:00
fieldBinlog : fieldBinlog ,
2021-07-16 17:19:55 +08:00
}
}
2021-11-03 23:50:14 +08:00
// Segment is a wrapper of the underlying C-structure segment.
2020-08-25 15:45:19 +08:00
type Segment struct {
2021-07-29 16:03:22 +08:00
segPtrMu sync . RWMutex // guards segmentPtr
2021-01-30 16:02:10 +08:00
segmentPtr C . CSegmentInterface
2021-01-13 10:40:46 +08:00
segmentID UniqueID
2021-01-20 09:36:50 +08:00
partitionID UniqueID
2021-01-13 10:40:46 +08:00
collectionID UniqueID
2021-06-15 12:41:40 +08:00
onService bool
2021-06-15 20:06:10 +08:00
vChannelID Channel
2021-01-13 10:40:46 +08:00
lastMemSize int64
lastRowCount int64
2021-06-15 12:41:40 +08:00
once sync . Once // guards enableIndex
enableIndex bool
2021-03-10 14:51:00 +08:00
2021-01-13 10:40:46 +08:00
rmMutex sync . Mutex // guards recentlyModified
2020-11-05 10:52:50 +08:00
recentlyModified bool
2021-01-13 10:40:46 +08:00
2021-01-30 16:02:10 +08:00
typeMu sync . Mutex // guards builtIndex
2021-03-10 22:06:22 +08:00
segmentType segmentType
2021-01-30 16:02:10 +08:00
2021-02-04 11:40:14 +08:00
paramMutex sync . RWMutex // guards index
2021-09-14 10:25:26 +08:00
indexInfos map [ FieldID ] * indexInfo
2021-07-16 17:19:55 +08:00
2021-07-24 09:25:22 +08:00
idBinlogRowSizes [ ] int64
2021-07-16 17:19:55 +08:00
vectorFieldMutex sync . RWMutex // guards vectorFieldInfos
vectorFieldInfos map [ UniqueID ] * VectorFieldInfo
2021-08-28 10:12:00 +08:00
pkFilter * bloom . BloomFilter // bloom filter of pk inside a segment
2020-08-25 15:45:19 +08:00
}
2021-11-03 23:50:14 +08:00
// ID returns the identity number.
2020-11-09 16:27:11 +08:00
func ( s * Segment ) ID ( ) UniqueID {
return s . segmentID
}
2021-03-10 14:51:00 +08:00
func ( s * Segment ) setEnableIndex ( enable bool ) {
setOnce := func ( ) {
s . enableIndex = enable
}
s . once . Do ( setOnce )
}
func ( s * Segment ) getEnableIndex ( ) bool {
return s . enableIndex
}
2021-07-24 09:25:22 +08:00
func ( s * Segment ) setIDBinlogRowSizes ( sizes [ ] int64 ) {
s . idBinlogRowSizes = sizes
}
func ( s * Segment ) getIDBinlogRowSizes ( ) [ ] int64 {
return s . idBinlogRowSizes
}
2021-01-30 16:02:10 +08:00
func ( s * Segment ) setRecentlyModified ( modify bool ) {
2021-01-13 10:40:46 +08:00
s . rmMutex . Lock ( )
defer s . rmMutex . Unlock ( )
2021-01-12 18:03:24 +08:00
s . recentlyModified = modify
}
2021-01-30 16:02:10 +08:00
func ( s * Segment ) getRecentlyModified ( ) bool {
2021-01-13 10:40:46 +08:00
s . rmMutex . Lock ( )
defer s . rmMutex . Unlock ( )
2021-01-12 18:03:24 +08:00
return s . recentlyModified
}
2021-01-30 16:02:10 +08:00
func ( s * Segment ) setType ( segType segmentType ) {
s . typeMu . Lock ( )
defer s . typeMu . Unlock ( )
s . segmentType = segType
}
func ( s * Segment ) getType ( ) segmentType {
s . typeMu . Lock ( )
defer s . typeMu . Unlock ( )
return s . segmentType
2020-11-09 16:27:11 +08:00
}
2021-06-15 12:41:40 +08:00
func ( s * Segment ) getOnService ( ) bool {
return s . onService
}
func ( s * Segment ) setOnService ( onService bool ) {
s . onService = onService
}
2021-07-16 17:19:55 +08:00
func ( s * Segment ) setVectorFieldInfo ( fieldID UniqueID , info * VectorFieldInfo ) {
s . vectorFieldMutex . Lock ( )
defer s . vectorFieldMutex . Unlock ( )
s . vectorFieldInfos [ fieldID ] = info
}
func ( s * Segment ) getVectorFieldInfo ( fieldID UniqueID ) ( * VectorFieldInfo , error ) {
s . vectorFieldMutex . Lock ( )
defer s . vectorFieldMutex . Unlock ( )
if info , ok := s . vectorFieldInfos [ fieldID ] ; ok {
return info , nil
}
return nil , errors . New ( "Invalid fieldID " + strconv . Itoa ( int ( fieldID ) ) )
}
2021-09-14 10:25:26 +08:00
func newSegment ( collection * Collection , segmentID UniqueID , partitionID UniqueID , collectionID UniqueID , vChannelID Channel , segType segmentType , onService bool ) * Segment {
2021-01-20 09:36:50 +08:00
/ *
CSegmentInterface
NewSegment ( CCollection collection , uint64_t segment_id , SegmentType seg_type ) ;
* /
2021-02-07 21:26:03 +08:00
var segmentPtr C . CSegmentInterface
switch segType {
2021-03-10 22:06:22 +08:00
case segmentTypeInvalid :
2021-07-31 10:47:22 +08:00
log . Warn ( "illegal segment type when create segment" )
2021-02-07 21:26:03 +08:00
return nil
2021-03-10 22:06:22 +08:00
case segmentTypeSealed :
2021-11-15 16:57:09 +08:00
segmentPtr = C . NewSegment ( collection . collectionPtr , C . Sealed )
2021-03-10 22:06:22 +08:00
case segmentTypeGrowing :
2021-11-15 16:57:09 +08:00
segmentPtr = C . NewSegment ( collection . collectionPtr , C . Growing )
2021-02-07 21:26:03 +08:00
default :
2021-07-31 10:47:22 +08:00
log . Warn ( "illegal segment type when create segment" )
2021-02-07 21:26:03 +08:00
return nil
}
2021-09-11 14:40:01 +08:00
log . Debug ( "create segment" , zap . Int64 ( "segmentID" , segmentID ) , zap . Int32 ( "segmentType" , int32 ( segType ) ) )
2021-03-05 09:21:35 +08:00
2021-07-16 17:19:55 +08:00
var segment = & Segment {
segmentPtr : segmentPtr ,
segmentType : segType ,
segmentID : segmentID ,
partitionID : partitionID ,
collectionID : collectionID ,
vChannelID : vChannelID ,
onService : onService ,
indexInfos : make ( map [ int64 ] * indexInfo ) ,
vectorFieldInfos : make ( map [ UniqueID ] * VectorFieldInfo ) ,
2021-10-18 20:08:42 +08:00
pkFilter : bloom . NewWithEstimates ( bloomFilterSize , maxBloomFalsePositive ) ,
2021-01-20 09:36:50 +08:00
}
2021-07-16 17:19:55 +08:00
return segment
2021-01-20 09:36:50 +08:00
}
2020-11-09 16:27:11 +08:00
func deleteSegment ( segment * Segment ) {
/ *
void
2021-01-19 11:37:16 +08:00
deleteSegment ( CSegmentInterface segment ) ;
2020-11-09 16:27:11 +08:00
* /
2021-08-12 14:14:08 +08:00
if segment . segmentPtr == nil {
return
}
2021-07-29 16:03:22 +08:00
segment . segPtrMu . Lock ( )
defer segment . segPtrMu . Unlock ( )
2020-11-09 16:27:11 +08:00
cPtr := segment . segmentPtr
C . DeleteSegment ( cPtr )
2021-02-02 19:54:31 +08:00
segment . segmentPtr = nil
2021-03-05 09:21:35 +08:00
2021-12-10 18:51:08 +08:00
log . Debug ( "delete segment from memory" , zap . Int64 ( "collectionID" , segment . collectionID ) , zap . Int64 ( "partitionID" , segment . partitionID ) , zap . Int64 ( "segmentID" , segment . ID ( ) ) )
2021-03-05 09:21:35 +08:00
2021-02-02 19:54:31 +08:00
segment = nil
2020-11-09 16:27:11 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) getRowCount ( ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
getRowCount ( CSegmentInterface c_segment ) ;
2020-09-03 19:58:33 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( )
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return - 1
}
2020-11-09 16:27:11 +08:00
var rowCount = C . GetRowCount ( s . segmentPtr )
2020-09-03 19:58:33 +08:00
return int64 ( rowCount )
2020-08-25 15:45:19 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) getDeletedCount ( ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
getDeletedCount ( CSegmentInterface c_segment ) ;
2020-09-03 19:58:33 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( )
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return - 1
}
2020-11-09 16:27:11 +08:00
var deletedCount = C . GetDeletedCount ( s . segmentPtr )
2020-09-03 19:58:33 +08:00
return int64 ( deletedCount )
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) getMemSize ( ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
GetMemoryUsageInBytes ( CSegmentInterface c_segment ) ;
2020-09-21 15:10:54 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( )
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return - 1
}
2020-11-09 16:27:11 +08:00
var memoryUsageInBytes = C . GetMemoryUsageInBytes ( s . segmentPtr )
2020-09-21 15:10:54 +08:00
2020-10-29 19:55:57 +08:00
return int64 ( memoryUsageInBytes )
2020-09-16 15:21:10 +08:00
}
2021-07-13 22:20:33 +08:00
func ( s * Segment ) search ( plan * SearchPlan ,
2021-03-30 22:16:58 +08:00
searchRequests [ ] * searchRequest ,
2021-01-20 09:36:50 +08:00
timestamp [ ] Timestamp ) ( * SearchResult , error ) {
/ *
CStatus
Search ( void * plan ,
void * placeholder_groups ,
uint64_t * timestamps ,
int num_groups ,
long int * result_ids ,
float * result_distances ) ;
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( )
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return nil , errors . New ( "null seg core pointer" )
}
2021-01-20 09:36:50 +08:00
cPlaceholderGroups := make ( [ ] C . CPlaceholderGroup , 0 )
2021-03-30 22:16:58 +08:00
for _ , pg := range searchRequests {
2021-01-20 09:36:50 +08:00
cPlaceholderGroups = append ( cPlaceholderGroups , ( * pg ) . cPlaceholderGroup )
}
var searchResult SearchResult
2021-07-01 10:32:15 +08:00
ts := C . uint64_t ( timestamp [ 0 ] )
cPlaceHolderGroup := cPlaceholderGroups [ 0 ]
2021-01-20 09:36:50 +08:00
2021-03-10 22:06:22 +08:00
log . Debug ( "do search on segment" , zap . Int64 ( "segmentID" , s . segmentID ) , zap . Int32 ( "segmentType" , int32 ( s . segmentType ) ) )
2021-11-11 10:18:42 +08:00
status := C . Search ( s . segmentPtr , plan . cSearchPlan , cPlaceHolderGroup , ts , & searchResult . cSearchResult )
if err := HandleCStatus ( & status , "Search failed" ) ; err != nil {
return nil , err
2021-01-20 09:36:50 +08:00
}
return & searchResult , nil
}
2021-11-15 11:05:09 +08:00
// HandleCProto deal with the result proto returned from CGO
func HandleCProto ( cRes * C . CProto , msg proto . Message ) error {
// Standalone CProto is protobuf created by C side,
// Passed from c side
// memory is managed manually
blob := C . GoBytes ( unsafe . Pointer ( cRes . proto_blob ) , C . int32_t ( cRes . proto_size ) )
defer C . free ( cRes . proto_blob )
return proto . Unmarshal ( blob , msg )
}
2021-11-10 17:15:37 +08:00
func ( s * Segment ) retrieve ( plan * RetrievePlan ) ( * segcorepb . RetrieveResults , error ) {
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( )
if s . segmentPtr == nil {
return nil , errors . New ( "null seg core pointer" )
}
2021-11-12 10:04:49 +08:00
var retrieveResult RetrieveResult
ts := C . uint64_t ( plan . Timestamp )
status := C . Retrieve ( s . segmentPtr , plan . cRetrievePlan , ts , & retrieveResult . cRetrieveResult )
if err := HandleCStatus ( & status , "Retrieve failed" ) ; err != nil {
return nil , err
}
2021-06-15 14:43:57 +08:00
result := new ( segcorepb . RetrieveResults )
2021-11-12 10:04:49 +08:00
if err := HandleCProto ( & retrieveResult . cRetrieveResult , result ) ; err != nil {
2021-06-04 10:38:34 +08:00
return nil , err
}
return result , nil
}
2021-08-18 16:30:11 +08:00
func ( s * Segment ) fillVectorFieldsData ( collectionID UniqueID ,
vcm storage . ChunkManager , result * segcorepb . RetrieveResults ) error {
2021-08-14 11:18:10 +08:00
for _ , fieldData := range result . FieldsData {
2021-11-23 14:25:15 +08:00
if ! typeutil . IsVectorType ( fieldData . Type ) {
2021-08-14 11:18:10 +08:00
continue
}
vecFieldInfo , err := s . getVectorFieldInfo ( fieldData . FieldId )
if err != nil {
continue
}
2021-11-23 14:25:15 +08:00
// If the vector field doesn't have index. Vector data is in memory for
// brute force search. No need to download data from remote.
if _ , ok := s . indexInfos [ fieldData . FieldId ] ; ! ok {
continue
}
2021-08-14 11:18:10 +08:00
dim := fieldData . GetVectors ( ) . GetDim ( )
2021-11-23 14:25:15 +08:00
log . Debug ( "FillVectorFieldData" , zap . Int64 ( "fieldId" , fieldData . FieldId ) ,
zap . Any ( "datatype" , fieldData . Type ) , zap . Int64 ( "dim" , dim ) )
2021-08-14 11:18:10 +08:00
for i , offset := range result . Offset {
var vecPath string
for index , idBinlogRowSize := range s . idBinlogRowSizes {
if offset < idBinlogRowSize {
vecPath = vecFieldInfo . fieldBinlog . Binlogs [ index ]
break
} else {
offset -= idBinlogRowSize
}
}
2021-11-23 14:25:15 +08:00
log . Debug ( "FillVectorFieldData" , zap . String ( "path" , vecPath ) )
2021-08-14 11:18:10 +08:00
switch fieldData . Type {
case schemapb . DataType_BinaryVector :
rowBytes := dim / 8
x := fieldData . GetVectors ( ) . GetData ( ) . ( * schemapb . VectorField_BinaryVector )
content := make ( [ ] byte , rowBytes )
2021-11-23 14:25:15 +08:00
if _ , err = vcm . ReadAt ( vecPath , content , offset * rowBytes ) ; err != nil {
2021-08-14 11:18:10 +08:00
return err
}
resultLen := dim / 8
copy ( x . BinaryVector [ i * int ( resultLen ) : ( i + 1 ) * int ( resultLen ) ] , content )
case schemapb . DataType_FloatVector :
x := fieldData . GetVectors ( ) . GetData ( ) . ( * schemapb . VectorField_FloatVector )
rowBytes := dim * 4
content := make ( [ ] byte , rowBytes )
2021-11-23 14:25:15 +08:00
if _ , err = vcm . ReadAt ( vecPath , content , offset * rowBytes ) ; err != nil {
2021-08-14 11:18:10 +08:00
return err
}
floatResult := make ( [ ] float32 , dim )
buf := bytes . NewReader ( content )
2021-11-23 14:25:15 +08:00
if err = binary . Read ( buf , common . Endian , & floatResult ) ; err != nil {
2021-08-14 11:18:10 +08:00
return err
}
resultLen := dim
copy ( x . FloatVector . Data [ i * int ( resultLen ) : ( i + 1 ) * int ( resultLen ) ] , floatResult )
}
}
}
return nil
}
2021-04-07 18:29:19 +08:00
//-------------------------------------------------------------------------------------- index info interface
func ( s * Segment ) setIndexName ( fieldID int64 , name string ) error {
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return errors . New ( "index info hasn't been init" )
}
s . indexInfos [ fieldID ] . setIndexName ( name )
return nil
}
func ( s * Segment ) setIndexParam ( fieldID int64 , indexParams map [ string ] string ) error {
2021-01-20 09:36:50 +08:00
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
2021-04-07 18:29:19 +08:00
if indexParams == nil {
2021-02-02 19:54:31 +08:00
return errors . New ( "empty loadIndexMsg's indexParam" )
2021-01-20 09:36:50 +08:00
}
2021-04-07 18:29:19 +08:00
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return errors . New ( "index info hasn't been init" )
2021-01-20 09:36:50 +08:00
}
2021-04-07 18:29:19 +08:00
s . indexInfos [ fieldID ] . setIndexParams ( indexParams )
2021-01-20 09:36:50 +08:00
return nil
}
2021-04-07 18:29:19 +08:00
func ( s * Segment ) setIndexPaths ( fieldID int64 , indexPaths [ ] string ) error {
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return errors . New ( "index info hasn't been init" )
}
s . indexInfos [ fieldID ] . setIndexPaths ( indexPaths )
return nil
}
func ( s * Segment ) setIndexID ( fieldID int64 , id UniqueID ) error {
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return errors . New ( "index info hasn't been init" )
}
s . indexInfos [ fieldID ] . setIndexID ( id )
return nil
}
func ( s * Segment ) setBuildID ( fieldID int64 , id UniqueID ) error {
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return errors . New ( "index info hasn't been init" )
}
s . indexInfos [ fieldID ] . setBuildID ( id )
return nil
}
func ( s * Segment ) getIndexName ( fieldID int64 ) string {
2021-12-07 10:01:52 +08:00
s . paramMutex . RLock ( )
defer s . paramMutex . RUnlock ( )
2021-04-07 18:29:19 +08:00
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return ""
}
return s . indexInfos [ fieldID ] . getIndexName ( )
}
func ( s * Segment ) getIndexID ( fieldID int64 ) UniqueID {
2021-12-06 15:23:41 +08:00
s . paramMutex . RLock ( )
defer s . paramMutex . RUnlock ( )
2021-04-07 18:29:19 +08:00
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return - 1
}
return s . indexInfos [ fieldID ] . getIndexID ( )
}
func ( s * Segment ) getBuildID ( fieldID int64 ) UniqueID {
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return - 1
}
return s . indexInfos [ fieldID ] . getBuildID ( )
}
func ( s * Segment ) getIndexPaths ( fieldID int64 ) [ ] string {
2021-12-07 10:03:47 +08:00
s . paramMutex . RLock ( )
defer s . paramMutex . RUnlock ( )
2021-04-07 18:29:19 +08:00
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return nil
}
return s . indexInfos [ fieldID ] . getIndexPaths ( )
}
func ( s * Segment ) getIndexParams ( fieldID int64 ) map [ string ] string {
2021-12-06 15:21:46 +08:00
s . paramMutex . RLock ( )
defer s . paramMutex . RUnlock ( )
2021-04-07 18:29:19 +08:00
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return nil
}
return s . indexInfos [ fieldID ] . getIndexParams ( )
}
2021-01-29 15:22:24 +08:00
func ( s * Segment ) matchIndexParam ( fieldID int64 , indexParams indexParam ) bool {
2021-01-20 09:36:50 +08:00
s . paramMutex . RLock ( )
defer s . paramMutex . RUnlock ( )
2021-04-07 18:29:19 +08:00
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return false
}
fieldIndexParam := s . indexInfos [ fieldID ] . getIndexParams ( )
2021-01-20 09:36:50 +08:00
if fieldIndexParam == nil {
return false
}
2021-09-09 17:36:02 +08:00
paramSize := len ( s . indexInfos [ fieldID ] . indexParams )
2021-01-20 09:36:50 +08:00
matchCount := 0
2021-01-29 15:22:24 +08:00
for k , v := range indexParams {
value , ok := fieldIndexParam [ k ]
2021-01-20 09:36:50 +08:00
if ! ok {
return false
}
2021-01-29 15:22:24 +08:00
if v != value {
2021-01-20 09:36:50 +08:00
return false
}
matchCount ++
}
return paramSize == matchCount
}
2021-12-03 11:29:33 +08:00
func ( s * Segment ) setIndexInfo ( fieldID int64 , info * indexInfo ) {
2021-12-01 22:26:10 +08:00
s . paramMutex . Lock ( )
defer s . paramMutex . Unlock ( )
2021-04-07 18:29:19 +08:00
s . indexInfos [ fieldID ] = info
}
func ( s * Segment ) checkIndexReady ( fieldID int64 ) bool {
s . paramMutex . RLock ( )
defer s . paramMutex . RUnlock ( )
if _ , ok := s . indexInfos [ fieldID ] ; ! ok {
return false
}
return s . indexInfos [ fieldID ] . getReadyLoad ( )
}
2021-10-19 20:18:47 +08:00
func ( s * Segment ) updateBloomFilter ( pks [ ] int64 ) {
buf := make ( [ ] byte , 8 )
for _ , pk := range pks {
2021-11-02 18:16:32 +08:00
common . Endian . PutUint64 ( buf , uint64 ( pk ) )
2021-10-19 20:18:47 +08:00
s . pkFilter . Add ( buf )
}
}
2021-01-20 09:36:50 +08:00
//-------------------------------------------------------------------------------------- interfaces for growing segment
2021-03-12 19:23:06 +08:00
func ( s * Segment ) segmentPreInsert ( numOfRecords int ) ( int64 , error ) {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
PreInsert ( CSegmentInterface c_segment , long int size ) ;
2020-09-09 15:24:07 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-06-15 12:41:40 +08:00
if s . segmentType != segmentTypeGrowing {
2021-03-12 19:23:06 +08:00
return 0 , nil
}
var offset int64
2021-03-31 16:16:58 +08:00
cOffset := ( * C . long ) ( & offset )
status := C . PreInsert ( s . segmentPtr , C . long ( int64 ( numOfRecords ) ) , cOffset )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "PreInsert failed" ) ; err != nil {
return 0 , err
2021-03-12 19:23:06 +08:00
}
return offset , nil
2020-09-09 15:24:07 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) segmentPreDelete ( numOfRecords int ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
PreDelete ( CSegmentInterface c_segment , long int size ) ;
2020-09-09 15:24:07 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2020-11-09 16:27:11 +08:00
var offset = C . PreDelete ( s . segmentPtr , C . long ( int64 ( numOfRecords ) ) )
2020-09-09 15:24:07 +08:00
2020-09-12 16:57:37 +08:00
return int64 ( offset )
2020-09-09 15:24:07 +08:00
}
2021-09-07 15:45:59 +08:00
// TODO: remove reference of slice
2020-11-05 10:52:50 +08:00
func ( s * Segment ) segmentInsert ( offset int64 , entityIDs * [ ] UniqueID , timestamps * [ ] Timestamp , records * [ ] * commonpb . Blob ) error {
2020-09-21 18:16:06 +08:00
/ *
2020-11-26 16:01:31 +08:00
CStatus
2021-01-19 11:37:16 +08:00
Insert ( CSegmentInterface c_segment ,
2020-10-24 10:45:57 +08:00
long int reserved_offset ,
signed long int size ,
const long * primary_keys ,
const unsigned long * timestamps ,
void * raw_data ,
int sizeof_per_row ,
signed long int count ) ;
2020-09-02 16:23:50 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-06-15 12:41:40 +08:00
if s . segmentType != segmentTypeGrowing {
2021-03-26 05:08:08 +08:00
return nil
}
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2021-10-18 20:08:42 +08:00
2020-09-08 10:39:09 +08:00
// Blobs to one big blob
2020-11-05 10:52:50 +08:00
var numOfRow = len ( * entityIDs )
var sizeofPerRow = len ( ( * records ) [ 0 ] . Value )
assert . Equal ( nil , numOfRow , len ( * records ) )
2021-10-15 11:02:32 +08:00
if numOfRow != len ( * records ) {
2021-11-16 09:27:36 +08:00
return errors . New ( "entityIDs row num not equal to length of records" )
2021-10-15 11:02:32 +08:00
}
2020-11-05 10:52:50 +08:00
var rawData = make ( [ ] byte , numOfRow * sizeofPerRow )
var copyOffset = 0
for i := 0 ; i < len ( * records ) ; i ++ {
copy ( rawData [ copyOffset : ] , ( * records ) [ i ] . Value )
copyOffset += sizeofPerRow
}
var cOffset = C . long ( offset )
var cNumOfRows = C . long ( numOfRow )
var cEntityIdsPtr = ( * C . long ) ( & ( * entityIDs ) [ 0 ] )
var cTimestampsPtr = ( * C . ulong ) ( & ( * timestamps ) [ 0 ] )
var cSizeofPerRow = C . int ( sizeofPerRow )
var cRawDataVoidPtr = unsafe . Pointer ( & rawData [ 0 ] )
2021-06-05 16:21:36 +08:00
log . Debug ( "QueryNode::Segment::InsertBegin" , zap . Any ( "cNumOfRows" , cNumOfRows ) )
2021-11-11 10:18:42 +08:00
status := C . Insert ( s . segmentPtr ,
2020-11-05 10:52:50 +08:00
cOffset ,
cNumOfRows ,
cEntityIdsPtr ,
cTimestampsPtr ,
cRawDataVoidPtr ,
cSizeofPerRow ,
cNumOfRows )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "Insert failed" ) ; err != nil {
2021-06-05 16:21:36 +08:00
return err
2020-11-05 10:52:50 +08:00
}
2020-09-02 17:18:49 +08:00
2021-01-30 16:02:10 +08:00
s . setRecentlyModified ( true )
2020-09-07 17:01:46 +08:00
return nil
2020-08-28 17:29:26 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) segmentDelete ( offset int64 , entityIDs * [ ] UniqueID , timestamps * [ ] Timestamp ) error {
2020-09-21 18:16:06 +08:00
/ *
2020-11-26 16:01:31 +08:00
CStatus
2021-01-19 11:37:16 +08:00
Delete ( CSegmentInterface c_segment ,
2020-10-24 10:45:57 +08:00
long int reserved_offset ,
long size ,
const long * primary_keys ,
const unsigned long * timestamps ) ;
2020-09-02 16:23:50 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2021-10-15 11:02:32 +08:00
if len ( * entityIDs ) != len ( * timestamps ) {
2021-11-16 09:25:40 +08:00
return errors . New ( "length of entityIDs not equal to length of timestamps" )
2021-10-15 11:02:32 +08:00
}
2020-09-09 15:24:07 +08:00
var cOffset = C . long ( offset )
var cSize = C . long ( len ( * entityIDs ) )
2020-09-12 16:57:37 +08:00
var cEntityIdsPtr = ( * C . long ) ( & ( * entityIDs ) [ 0 ] )
2020-09-09 15:24:07 +08:00
var cTimestampsPtr = ( * C . ulong ) ( & ( * timestamps ) [ 0 ] )
2020-09-02 16:23:50 +08:00
2021-11-11 10:18:42 +08:00
status := C . Delete ( s . segmentPtr , cOffset , cSize , cEntityIdsPtr , cTimestampsPtr )
if err := HandleCStatus ( & status , "Delete failed" ) ; err != nil {
return err
2020-09-02 17:18:49 +08:00
}
2020-09-02 16:23:50 +08:00
2020-09-07 17:01:46 +08:00
return nil
2020-08-28 17:29:26 +08:00
}
2021-01-20 09:36:50 +08:00
//-------------------------------------------------------------------------------------- interfaces for sealed segment
2021-01-21 15:20:23 +08:00
func ( s * Segment ) segmentLoadFieldData ( fieldID int64 , rowCount int , data interface { } ) error {
2020-11-09 16:27:11 +08:00
/ *
2020-11-26 16:01:31 +08:00
CStatus
2021-01-20 09:36:50 +08:00
LoadFieldData ( CSegmentInterface c_segment , CLoadFieldDataInfo load_field_data_info ) ;
2020-11-09 16:27:11 +08:00
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2021-03-10 22:06:22 +08:00
if s . segmentType != segmentTypeSealed {
2021-03-22 16:36:10 +08:00
errMsg := fmt . Sprintln ( "segmentLoadFieldData failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
2021-01-19 14:13:49 +08:00
}
2021-01-21 15:20:23 +08:00
// data interface check
var dataPointer unsafe . Pointer
emptyErr := errors . New ( "null field data to be loaded" )
switch d := data . ( type ) {
case [ ] bool :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
2021-02-20 09:20:51 +08:00
case [ ] byte :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
2021-01-21 15:20:23 +08:00
case [ ] int8 :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
case [ ] int16 :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
case [ ] int32 :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
case [ ] int64 :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
case [ ] float32 :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
case [ ] float64 :
if len ( d ) <= 0 {
return emptyErr
}
dataPointer = unsafe . Pointer ( & d [ 0 ] )
case [ ] string :
// TODO: support string type
return errors . New ( "we cannot support string type now" )
default :
return errors . New ( "illegal field data type" )
}
2021-01-20 09:36:50 +08:00
/ *
2021-01-21 15:20:23 +08:00
typedef struct CLoadFieldDataInfo {
2021-01-20 09:36:50 +08:00
int64_t field_id ;
void * blob ;
int64_t row_count ;
2021-01-21 15:20:23 +08:00
} CLoadFieldDataInfo ;
2021-01-20 09:36:50 +08:00
* /
loadInfo := C . CLoadFieldDataInfo {
field_id : C . int64_t ( fieldID ) ,
2021-01-21 15:20:23 +08:00
blob : dataPointer ,
2021-01-20 09:36:50 +08:00
row_count : C . int64_t ( rowCount ) ,
2021-01-19 18:32:57 +08:00
}
2021-11-11 10:18:42 +08:00
status := C . LoadFieldData ( s . segmentPtr , loadInfo )
if err := HandleCStatus ( & status , "LoadFieldData failed" ) ; err != nil {
return err
2020-12-24 20:55:40 +08:00
}
2021-03-22 16:36:10 +08:00
log . Debug ( "load field done" ,
zap . Int64 ( "fieldID" , fieldID ) ,
zap . Int ( "row count" , rowCount ) ,
zap . Int64 ( "segmentID" , s . ID ( ) ) )
return nil
}
2021-10-22 18:51:14 +08:00
func ( s * Segment ) segmentLoadDeletedRecord ( primaryKeys [ ] IntPrimaryKey , timestamps [ ] Timestamp , rowCount int64 ) error {
2021-10-22 13:11:11 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
if s . segmentType != segmentTypeSealed {
errMsg := fmt . Sprintln ( "segmentLoadFieldData failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
}
2021-10-22 18:51:14 +08:00
loadInfo := C . CLoadDeletedRecordInfo {
timestamps : unsafe . Pointer ( & timestamps [ 0 ] ) ,
primary_keys : unsafe . Pointer ( & primaryKeys [ 0 ] ) ,
row_count : C . int64_t ( rowCount ) ,
}
/ *
CStatus
LoadDeletedRecord ( CSegmentInterface c_segment , CLoadDeletedRecordInfo deleted_record_info )
* /
2021-11-11 10:18:42 +08:00
status := C . LoadDeletedRecord ( s . segmentPtr , loadInfo )
if err := HandleCStatus ( & status , "LoadDeletedRecord failed" ) ; err != nil {
return err
2021-10-22 18:51:14 +08:00
}
2021-10-22 13:11:11 +08:00
2021-10-22 18:51:14 +08:00
log . Debug ( "load deleted record done" ,
zap . Int64 ( "row count" , rowCount ) ,
zap . Int64 ( "segmentID" , s . ID ( ) ) )
2021-10-22 13:11:11 +08:00
return nil
}
2021-03-22 16:36:10 +08:00
func ( s * Segment ) dropFieldData ( fieldID int64 ) error {
/ *
CStatus
DropFieldData ( CSegmentInterface c_segment , int64_t field_id ) ;
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-03-22 16:36:10 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
if s . segmentType != segmentTypeIndexing {
errMsg := fmt . Sprintln ( "dropFieldData failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
}
2021-11-11 10:18:42 +08:00
status := C . DropFieldData ( s . segmentPtr , C . long ( fieldID ) )
if err := HandleCStatus ( & status , "DropFieldData failed" ) ; err != nil {
return err
2021-03-22 16:36:10 +08:00
}
log . Debug ( "dropFieldData done" , zap . Int64 ( "fieldID" , fieldID ) , zap . Int64 ( "segmentID" , s . ID ( ) ) )
return nil
}
2021-04-07 18:29:19 +08:00
func ( s * Segment ) updateSegmentIndex ( bytesIndex [ ] [ ] byte , fieldID UniqueID ) error {
loadIndexInfo , err := newLoadIndexInfo ( )
defer deleteLoadIndexInfo ( loadIndexInfo )
if err != nil {
return err
}
err = loadIndexInfo . appendFieldInfo ( fieldID )
if err != nil {
return err
}
indexParams := s . getIndexParams ( fieldID )
for k , v := range indexParams {
err = loadIndexInfo . appendIndexParam ( k , v )
if err != nil {
return err
}
}
indexPaths := s . getIndexPaths ( fieldID )
err = loadIndexInfo . appendIndex ( bytesIndex , indexPaths )
if err != nil {
return err
}
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-03-22 16:36:10 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
if s . segmentType != segmentTypeSealed {
errMsg := fmt . Sprintln ( "updateSegmentIndex failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
}
status := C . UpdateSealedSegmentIndex ( s . segmentPtr , loadIndexInfo . cLoadIndexInfo )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "UpdateSealedSegmentIndex failed" ) ; err != nil {
return err
2021-03-22 16:36:10 +08:00
}
s . setType ( segmentTypeIndexing )
log . Debug ( "updateSegmentIndex done" , zap . Int64 ( "segmentID" , s . ID ( ) ) )
return nil
}
func ( s * Segment ) dropSegmentIndex ( fieldID int64 ) error {
/ *
CStatus
DropSealedSegmentIndex ( CSegmentInterface c_segment , int64_t field_id ) ;
* /
2021-07-29 16:03:22 +08:00
s . segPtrMu . RLock ( )
defer s . segPtrMu . RUnlock ( ) // thread safe guaranteed by segCore, use RLock
2021-03-22 16:36:10 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
if s . segmentType != segmentTypeIndexing {
errMsg := fmt . Sprintln ( "dropFieldData failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
}
2021-11-11 10:18:42 +08:00
status := C . DropSealedSegmentIndex ( s . segmentPtr , C . long ( fieldID ) )
if err := HandleCStatus ( & status , "DropSealedSegmentIndex failed" ) ; err != nil {
return err
2021-03-22 16:36:10 +08:00
}
log . Debug ( "dropSegmentIndex done" , zap . Int64 ( "fieldID" , fieldID ) , zap . Int64 ( "segmentID" , s . ID ( ) ) )
2021-03-05 09:21:35 +08:00
2020-12-24 20:55:40 +08:00
return nil
}