2021-12-10 20:59:42 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:47:10 +08:00
// with the License. You may obtain a copy of the License at
//
2021-12-10 20:59:42 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:47:10 +08:00
//
2021-12-10 20:59:42 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:47:10 +08:00
2021-01-16 10:12:14 +08:00
package querynode
2020-08-25 15:45:19 +08:00
2020-08-29 17:42:41 +08:00
/ *
2020-10-23 18:01:24 +08:00
# cgo CFLAGS : - I $ { SRCDIR } / . . / core / output / include
2022-02-09 14:27:46 +08:00
# cgo darwin LDFLAGS : - L $ { SRCDIR } / . . / core / output / lib - lmilvus_segcore - Wl , - rpath , "${SRCDIR}/../core/output/lib"
# cgo linux LDFLAGS : - L $ { SRCDIR } / . . / core / output / lib - lmilvus_segcore - Wl , - rpath = $ { SRCDIR } / . . / core / output / lib
2022-03-17 17:17:22 +08:00
# cgo windows LDFLAGS : - L $ { SRCDIR } / . . / core / output / lib - lmilvus_segcore - Wl , - rpath = $ { SRCDIR } / . . / core / output / lib
2020-08-29 17:42:41 +08:00
2020-11-25 10:31:51 +08:00
# include "segcore/collection_c.h"
# include "segcore/plan_c.h"
# include "segcore/reduce_c.h"
2020-08-29 17:42:41 +08:00
* /
2020-08-25 15:45:19 +08:00
import "C"
2020-08-28 17:29:26 +08:00
import (
2021-08-14 11:18:10 +08:00
"bytes"
"encoding/binary"
2021-06-09 11:37:55 +08:00
"errors"
2021-03-22 16:36:10 +08:00
"fmt"
2021-01-12 18:03:24 +08:00
"sync"
2020-11-05 10:52:50 +08:00
"unsafe"
2020-11-04 17:58:43 +08:00
2022-03-30 21:11:28 +08:00
"github.com/milvus-io/milvus/internal/util/funcutil"
2022-03-03 16:05:57 +08:00
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/timerecord"
2021-08-28 10:12:00 +08:00
"github.com/bits-and-blooms/bloom/v3"
2021-11-15 11:05:09 +08:00
"github.com/golang/protobuf/proto"
2021-03-05 09:21:35 +08:00
"go.uber.org/zap"
2020-11-09 16:27:11 +08:00
2021-11-02 18:16:32 +08:00
"github.com/milvus-io/milvus/internal/common"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
2021-07-16 17:19:55 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2022-02-08 21:57:46 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
2021-08-14 11:18:10 +08:00
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-06-15 14:43:57 +08:00
"github.com/milvus-io/milvus/internal/proto/segcorepb"
2021-08-14 11:18:10 +08:00
"github.com/milvus-io/milvus/internal/storage"
2020-08-28 17:29:26 +08:00
)
2022-02-08 21:57:46 +08:00
type segmentType = commonpb . SegmentState
2021-03-10 22:06:22 +08:00
2021-01-20 09:36:50 +08:00
const (
2022-02-08 21:57:46 +08:00
segmentTypeGrowing = commonpb . SegmentState_Growing
segmentTypeSealed = commonpb . SegmentState_Sealed
2021-01-20 09:36:50 +08:00
)
2021-10-18 20:08:42 +08:00
const (
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
2022-03-30 21:11:28 +08:00
// IndexedFieldInfo contains binlog info of vector field
type IndexedFieldInfo struct {
2021-07-24 09:25:22 +08:00
fieldBinlog * datapb . FieldBinlog
2022-03-30 21:11:28 +08:00
indexInfo * querypb . FieldIndexInfo
2021-07-16 17:19:55 +08:00
}
2021-11-03 23:50:14 +08:00
// Segment is a wrapper of the underlying C-structure segment.
2020-08-25 15:45:19 +08:00
type Segment struct {
2021-01-30 16:02:10 +08:00
segmentPtr C . CSegmentInterface
2021-01-13 10:40:46 +08:00
segmentID UniqueID
2021-01-20 09:36:50 +08:00
partitionID UniqueID
2021-01-13 10:40:46 +08:00
collectionID UniqueID
2021-06-15 12:41:40 +08:00
2021-06-15 20:06:10 +08:00
vChannelID Channel
2021-01-13 10:40:46 +08:00
lastMemSize int64
lastRowCount int64
2022-01-10 23:17:36 +08:00
rmMutex sync . RWMutex // guards recentlyModified
2020-11-05 10:52:50 +08:00
recentlyModified bool
2021-01-13 10:40:46 +08:00
2021-01-30 16:02:10 +08:00
typeMu sync . Mutex // guards builtIndex
2021-03-10 22:06:22 +08:00
segmentType segmentType
2021-01-30 16:02:10 +08:00
2021-07-24 09:25:22 +08:00
idBinlogRowSizes [ ] int64
2022-03-30 21:11:28 +08:00
indexedFieldMutex sync . RWMutex // guards indexedFieldInfos
indexedFieldInfos map [ UniqueID ] * IndexedFieldInfo
2021-08-28 10:12:00 +08:00
pkFilter * bloom . BloomFilter // bloom filter of pk inside a segment
2020-08-25 15:45:19 +08:00
}
2021-11-03 23:50:14 +08:00
// ID returns the identity number.
2020-11-09 16:27:11 +08:00
func ( s * Segment ) ID ( ) UniqueID {
return s . segmentID
}
2021-07-24 09:25:22 +08:00
func ( s * Segment ) setIDBinlogRowSizes ( sizes [ ] int64 ) {
s . idBinlogRowSizes = sizes
}
func ( s * Segment ) getIDBinlogRowSizes ( ) [ ] int64 {
return s . idBinlogRowSizes
}
2021-01-30 16:02:10 +08:00
func ( s * Segment ) setRecentlyModified ( modify bool ) {
2021-01-13 10:40:46 +08:00
s . rmMutex . Lock ( )
defer s . rmMutex . Unlock ( )
2021-01-12 18:03:24 +08:00
s . recentlyModified = modify
}
2021-01-30 16:02:10 +08:00
func ( s * Segment ) getRecentlyModified ( ) bool {
2022-01-10 23:17:36 +08:00
s . rmMutex . RLock ( )
defer s . rmMutex . RUnlock ( )
2021-01-12 18:03:24 +08:00
return s . recentlyModified
}
2021-01-30 16:02:10 +08:00
func ( s * Segment ) setType ( segType segmentType ) {
s . typeMu . Lock ( )
defer s . typeMu . Unlock ( )
s . segmentType = segType
}
func ( s * Segment ) getType ( ) segmentType {
s . typeMu . Lock ( )
defer s . typeMu . Unlock ( )
return s . segmentType
2020-11-09 16:27:11 +08:00
}
2022-03-30 21:11:28 +08:00
func ( s * Segment ) setIndexedFieldInfo ( fieldID UniqueID , info * IndexedFieldInfo ) {
s . indexedFieldMutex . Lock ( )
defer s . indexedFieldMutex . Unlock ( )
s . indexedFieldInfos [ fieldID ] = info
2021-07-16 17:19:55 +08:00
}
2022-03-30 21:11:28 +08:00
func ( s * Segment ) getIndexedFieldInfo ( fieldID UniqueID ) ( * IndexedFieldInfo , error ) {
s . indexedFieldMutex . RLock ( )
defer s . indexedFieldMutex . RUnlock ( )
if info , ok := s . indexedFieldInfos [ fieldID ] ; ok {
return & IndexedFieldInfo {
2022-02-08 21:57:46 +08:00
fieldBinlog : info . fieldBinlog ,
indexInfo : info . indexInfo ,
} , nil
2021-07-16 17:19:55 +08:00
}
2022-04-25 16:11:48 +08:00
return nil , fmt . Errorf ( "Invalid fieldID %d" , fieldID )
2021-07-16 17:19:55 +08:00
}
2022-03-30 21:11:28 +08:00
func ( s * Segment ) hasLoadIndexForIndexedField ( fieldID int64 ) bool {
s . indexedFieldMutex . RLock ( )
defer s . indexedFieldMutex . RUnlock ( )
2022-02-08 21:57:46 +08:00
2022-03-30 21:11:28 +08:00
if fieldInfo , ok := s . indexedFieldInfos [ fieldID ] ; ok {
2022-02-08 21:57:46 +08:00
return fieldInfo . indexInfo != nil && fieldInfo . indexInfo . EnableIndex
}
return false
}
2022-05-23 16:41:58 +08:00
func newSegment ( collection * Collection , segmentID UniqueID , partitionID UniqueID , collectionID UniqueID , vChannelID Channel , segType segmentType ) ( * Segment , error ) {
2021-01-20 09:36:50 +08:00
/ *
CSegmentInterface
NewSegment ( CCollection collection , uint64_t segment_id , SegmentType seg_type ) ;
* /
2021-02-07 21:26:03 +08:00
var segmentPtr C . CSegmentInterface
switch segType {
2021-03-10 22:06:22 +08:00
case segmentTypeSealed :
2022-01-11 18:07:34 +08:00
segmentPtr = C . NewSegment ( collection . collectionPtr , C . Sealed , C . int64_t ( segmentID ) )
2021-03-10 22:06:22 +08:00
case segmentTypeGrowing :
2022-01-11 18:07:34 +08:00
segmentPtr = C . NewSegment ( collection . collectionPtr , C . Growing , C . int64_t ( segmentID ) )
2021-02-07 21:26:03 +08:00
default :
2022-02-08 21:57:46 +08:00
err := fmt . Errorf ( "illegal segment type %d when create segment %d" , segType , segmentID )
log . Error ( "create new segment error" ,
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64 ( "partitionID" , partitionID ) ,
zap . Int64 ( "segmentID" , segmentID ) ,
zap . Int32 ( "segment type" , int32 ( segType ) ) ,
zap . Error ( err ) )
return nil , err
2021-02-07 21:26:03 +08:00
}
2022-05-07 10:27:51 +08:00
log . Info ( "create segment" ,
2022-02-08 21:57:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64 ( "partitionID" , partitionID ) ,
zap . Int64 ( "segmentID" , segmentID ) ,
zap . Int32 ( "segmentType" , int32 ( segType ) ) )
2021-03-05 09:21:35 +08:00
2021-07-16 17:19:55 +08:00
var segment = & Segment {
2022-03-30 21:11:28 +08:00
segmentPtr : segmentPtr ,
segmentType : segType ,
segmentID : segmentID ,
partitionID : partitionID ,
collectionID : collectionID ,
vChannelID : vChannelID ,
indexedFieldInfos : make ( map [ UniqueID ] * IndexedFieldInfo ) ,
2021-10-18 20:08:42 +08:00
pkFilter : bloom . NewWithEstimates ( bloomFilterSize , maxBloomFalsePositive ) ,
2021-01-20 09:36:50 +08:00
}
2022-02-08 21:57:46 +08:00
return segment , nil
2021-01-20 09:36:50 +08:00
}
2020-11-09 16:27:11 +08:00
func deleteSegment ( segment * Segment ) {
/ *
void
2021-01-19 11:37:16 +08:00
deleteSegment ( CSegmentInterface segment ) ;
2020-11-09 16:27:11 +08:00
* /
2021-08-12 14:14:08 +08:00
if segment . segmentPtr == nil {
return
}
2020-11-09 16:27:11 +08:00
cPtr := segment . segmentPtr
C . DeleteSegment ( cPtr )
2021-02-02 19:54:31 +08:00
segment . segmentPtr = nil
2021-03-05 09:21:35 +08:00
2022-05-07 10:27:51 +08:00
log . Info ( "delete segment from memory" , zap . Int64 ( "collectionID" , segment . collectionID ) , zap . Int64 ( "partitionID" , segment . partitionID ) , zap . Int64 ( "segmentID" , segment . ID ( ) ) )
2020-11-09 16:27:11 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) getRowCount ( ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
getRowCount ( CSegmentInterface c_segment ) ;
2020-09-03 19:58:33 +08:00
* /
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return - 1
}
2020-11-09 16:27:11 +08:00
var rowCount = C . GetRowCount ( s . segmentPtr )
2020-09-03 19:58:33 +08:00
return int64 ( rowCount )
2020-08-25 15:45:19 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) getDeletedCount ( ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
getDeletedCount ( CSegmentInterface c_segment ) ;
2020-09-03 19:58:33 +08:00
* /
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return - 1
}
2020-11-09 16:27:11 +08:00
var deletedCount = C . GetDeletedCount ( s . segmentPtr )
2020-09-03 19:58:33 +08:00
return int64 ( deletedCount )
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) getMemSize ( ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
GetMemoryUsageInBytes ( CSegmentInterface c_segment ) ;
2020-09-21 15:10:54 +08:00
* /
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return - 1
}
2020-11-09 16:27:11 +08:00
var memoryUsageInBytes = C . GetMemoryUsageInBytes ( s . segmentPtr )
2020-09-21 15:10:54 +08:00
2020-10-29 19:55:57 +08:00
return int64 ( memoryUsageInBytes )
2020-09-16 15:21:10 +08:00
}
2022-05-23 16:41:58 +08:00
func ( s * Segment ) search ( searchReq * searchRequest ) ( * SearchResult , error ) {
2021-01-20 09:36:50 +08:00
/ *
CStatus
Search ( void * plan ,
void * placeholder_groups ,
uint64_t * timestamps ,
int num_groups ,
long int * result_ids ,
float * result_distances ) ;
* /
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return nil , errors . New ( "null seg core pointer" )
}
2022-05-23 16:41:58 +08:00
if searchReq . plan == nil {
return nil , fmt . Errorf ( "nil search plan" )
2021-01-20 09:36:50 +08:00
}
var searchResult SearchResult
2022-05-30 15:30:01 +08:00
log . Debug ( "do search on segment" , zap . Int64 ( "segmentID" , s . segmentID ) , zap . String ( "segmentType" , s . segmentType . String ( ) ) )
2022-03-02 14:49:55 +08:00
tr := timerecord . NewTimeRecorder ( "cgoSearch" )
2022-05-23 16:41:58 +08:00
status := C . Search ( s . segmentPtr , searchReq . plan . cSearchPlan , searchReq . cPlaceholderGroup ,
C . uint64_t ( searchReq . timestamp ) , & searchResult . cSearchResult , C . int64_t ( s . segmentID ) )
2022-04-24 22:03:44 +08:00
metrics . QueryNodeSQSegmentLatencyInCore . WithLabelValues ( fmt . Sprint ( Params . QueryNodeCfg . GetNodeID ( ) ) , metrics . SearchLabel ) . Observe ( float64 ( tr . ElapseSpan ( ) . Milliseconds ( ) ) )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "Search failed" ) ; err != nil {
return nil , err
2021-01-20 09:36:50 +08:00
}
return & searchResult , nil
}
2021-11-10 17:15:37 +08:00
func ( s * Segment ) retrieve ( plan * RetrievePlan ) ( * segcorepb . RetrieveResults , error ) {
2021-07-29 16:03:22 +08:00
if s . segmentPtr == nil {
return nil , errors . New ( "null seg core pointer" )
}
2021-11-12 10:04:49 +08:00
var retrieveResult RetrieveResult
ts := C . uint64_t ( plan . Timestamp )
2022-03-02 14:49:55 +08:00
tr := timerecord . NewTimeRecorder ( "cgoRetrieve" )
2021-11-12 10:04:49 +08:00
status := C . Retrieve ( s . segmentPtr , plan . cRetrievePlan , ts , & retrieveResult . cRetrieveResult )
2022-04-24 22:03:44 +08:00
metrics . QueryNodeSQSegmentLatencyInCore . WithLabelValues ( fmt . Sprint ( Params . QueryNodeCfg . GetNodeID ( ) ) ,
2022-03-15 21:51:21 +08:00
metrics . QueryLabel ) . Observe ( float64 ( tr . ElapseSpan ( ) . Milliseconds ( ) ) )
2022-05-30 15:30:01 +08:00
log . Debug ( "do retrieve on segment" , zap . Int64 ( "segmentID" , s . segmentID ) , zap . String ( "segmentType" , s . segmentType . String ( ) ) )
2021-11-12 10:04:49 +08:00
if err := HandleCStatus ( & status , "Retrieve failed" ) ; err != nil {
return nil , err
}
2021-06-15 14:43:57 +08:00
result := new ( segcorepb . RetrieveResults )
2021-11-12 10:04:49 +08:00
if err := HandleCProto ( & retrieveResult . cRetrieveResult , result ) ; err != nil {
2021-06-04 10:38:34 +08:00
return nil , err
}
return result , nil
}
2022-03-30 21:11:28 +08:00
func ( s * Segment ) getFieldDataPath ( indexedFieldInfo * IndexedFieldInfo , offset int64 ) ( dataPath string , offsetInBinlog int64 ) {
offsetInBinlog = offset
for index , idBinlogRowSize := range s . idBinlogRowSizes {
if offsetInBinlog < idBinlogRowSize {
dataPath = indexedFieldInfo . fieldBinlog . Binlogs [ index ] . GetLogPath ( )
break
} else {
offsetInBinlog -= idBinlogRowSize
}
}
return dataPath , offsetInBinlog
}
func fillBinVecFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
dim := fieldData . GetVectors ( ) . GetDim ( )
rowBytes := dim / 8
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
x := fieldData . GetVectors ( ) . GetData ( ) . ( * schemapb . VectorField_BinaryVector )
resultLen := dim / 8
copy ( x . BinaryVector [ i * int ( resultLen ) : ( i + 1 ) * int ( resultLen ) ] , content )
return nil
}
func fillFloatVecFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
dim := fieldData . GetVectors ( ) . GetDim ( )
rowBytes := dim * 4
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
x := fieldData . GetVectors ( ) . GetData ( ) . ( * schemapb . VectorField_FloatVector )
floatResult := make ( [ ] float32 , dim )
buf := bytes . NewReader ( content )
if err = binary . Read ( buf , endian , & floatResult ) ; err != nil {
return err
}
resultLen := dim
copy ( x . FloatVector . Data [ i * int ( resultLen ) : ( i + 1 ) * int ( resultLen ) ] , floatResult )
return nil
}
func fillBoolFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read whole file.
// TODO: optimize here.
content , err := vcm . Read ( dataPath )
if err != nil {
return err
}
var arr schemapb . BoolArray
err = proto . Unmarshal ( content , & arr )
if err != nil {
return err
}
fieldData . GetScalars ( ) . GetBoolData ( ) . GetData ( ) [ i ] = arr . Data [ offset ]
return nil
}
func fillStringFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read whole file.
// TODO: optimize here.
content , err := vcm . Read ( dataPath )
if err != nil {
return err
}
var arr schemapb . StringArray
err = proto . Unmarshal ( content , & arr )
if err != nil {
return err
}
fieldData . GetScalars ( ) . GetStringData ( ) . GetData ( ) [ i ] = arr . Data [ offset ]
return nil
}
func fillInt8FieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read by offset.
rowBytes := int64 ( 1 )
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
var i8 int8
if err := funcutil . ReadBinary ( endian , content , & i8 ) ; err != nil {
return err
}
fieldData . GetScalars ( ) . GetIntData ( ) . GetData ( ) [ i ] = int32 ( i8 )
return nil
}
func fillInt16FieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read by offset.
rowBytes := int64 ( 2 )
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
var i16 int16
if err := funcutil . ReadBinary ( endian , content , & i16 ) ; err != nil {
return err
}
fieldData . GetScalars ( ) . GetIntData ( ) . GetData ( ) [ i ] = int32 ( i16 )
return nil
}
func fillInt32FieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read by offset.
rowBytes := int64 ( 4 )
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
return funcutil . ReadBinary ( endian , content , & ( fieldData . GetScalars ( ) . GetIntData ( ) . GetData ( ) [ i ] ) )
}
func fillInt64FieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read by offset.
rowBytes := int64 ( 8 )
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
return funcutil . ReadBinary ( endian , content , & ( fieldData . GetScalars ( ) . GetLongData ( ) . GetData ( ) [ i ] ) )
}
func fillFloatFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read by offset.
rowBytes := int64 ( 4 )
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
return funcutil . ReadBinary ( endian , content , & ( fieldData . GetScalars ( ) . GetFloatData ( ) . GetData ( ) [ i ] ) )
}
func fillDoubleFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
// read by offset.
rowBytes := int64 ( 8 )
content , err := vcm . ReadAt ( dataPath , offset * rowBytes , rowBytes )
if err != nil {
return err
}
return funcutil . ReadBinary ( endian , content , & ( fieldData . GetScalars ( ) . GetDoubleData ( ) . GetData ( ) [ i ] ) )
}
func fillFieldData ( vcm storage . ChunkManager , dataPath string , fieldData * schemapb . FieldData , i int , offset int64 , endian binary . ByteOrder ) error {
switch fieldData . Type {
case schemapb . DataType_BinaryVector :
return fillBinVecFieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_FloatVector :
return fillFloatVecFieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Bool :
return fillBoolFieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_String , schemapb . DataType_VarChar :
return fillStringFieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Int8 :
return fillInt8FieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Int16 :
return fillInt16FieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Int32 :
return fillInt32FieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Int64 :
return fillInt64FieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Float :
return fillFloatFieldData ( vcm , dataPath , fieldData , i , offset , endian )
case schemapb . DataType_Double :
return fillDoubleFieldData ( vcm , dataPath , fieldData , i , offset , endian )
default :
return fmt . Errorf ( "invalid data type: %s" , fieldData . Type . String ( ) )
}
}
func ( s * Segment ) fillIndexedFieldsData ( collectionID UniqueID ,
2021-08-18 16:30:11 +08:00
vcm storage . ChunkManager , result * segcorepb . RetrieveResults ) error {
2021-08-14 11:18:10 +08:00
for _ , fieldData := range result . FieldsData {
2022-03-30 21:11:28 +08:00
// If the vector field doesn't have indexed. Vector data is in memory for
// brute force search. No need to download data from remote.
if ! s . hasLoadIndexForIndexedField ( fieldData . FieldId ) {
2021-08-14 11:18:10 +08:00
continue
}
2022-03-30 21:11:28 +08:00
indexedFieldInfo , err := s . getIndexedFieldInfo ( fieldData . FieldId )
2021-08-14 11:18:10 +08:00
if err != nil {
continue
}
2021-11-23 14:25:15 +08:00
2022-03-30 21:11:28 +08:00
// TODO: optimize here. Now we'll read a whole file from storage every time we retrieve raw data by offset.
2021-08-14 11:18:10 +08:00
for i , offset := range result . Offset {
2022-03-30 21:11:28 +08:00
dataPath , offsetInBinlog := s . getFieldDataPath ( indexedFieldInfo , offset )
endian := common . Endian
// fill field data that fieldData[i] = dataPath[offsetInBinlog*rowBytes, (offsetInBinlog+1)*rowBytes]
if err := fillFieldData ( vcm , dataPath , fieldData , i , offsetInBinlog , endian ) ; err != nil {
return err
2021-08-14 11:18:10 +08:00
}
}
}
2022-03-30 21:11:28 +08:00
2021-08-14 11:18:10 +08:00
return nil
}
2022-04-02 17:43:29 +08:00
func ( s * Segment ) updateBloomFilter ( pks [ ] primaryKey ) {
2021-10-19 20:18:47 +08:00
buf := make ( [ ] byte , 8 )
for _ , pk := range pks {
2022-04-02 17:43:29 +08:00
switch pk . Type ( ) {
case schemapb . DataType_Int64 :
int64Value := pk . ( * int64PrimaryKey ) . Value
common . Endian . PutUint64 ( buf , uint64 ( int64Value ) )
s . pkFilter . Add ( buf )
case schemapb . DataType_VarChar :
stringValue := pk . ( * varCharPrimaryKey ) . Value
s . pkFilter . AddString ( stringValue )
default :
2022-05-07 10:27:51 +08:00
log . Warn ( "failed to update bloomfilter" , zap . Any ( "PK type" , pk . Type ( ) ) )
2022-04-02 17:43:29 +08:00
}
2021-10-19 20:18:47 +08:00
}
}
2021-01-20 09:36:50 +08:00
//-------------------------------------------------------------------------------------- interfaces for growing segment
2021-03-12 19:23:06 +08:00
func ( s * Segment ) segmentPreInsert ( numOfRecords int ) ( int64 , error ) {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
PreInsert ( CSegmentInterface c_segment , long int size ) ;
2020-09-09 15:24:07 +08:00
* /
2021-06-15 12:41:40 +08:00
if s . segmentType != segmentTypeGrowing {
2021-03-12 19:23:06 +08:00
return 0 , nil
}
var offset int64
2022-02-07 23:43:46 +08:00
cOffset := ( * C . int64_t ) ( & offset )
status := C . PreInsert ( s . segmentPtr , C . int64_t ( int64 ( numOfRecords ) ) , cOffset )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "PreInsert failed" ) ; err != nil {
return 0 , err
2021-03-12 19:23:06 +08:00
}
return offset , nil
2020-09-09 15:24:07 +08:00
}
2020-11-05 10:52:50 +08:00
func ( s * Segment ) segmentPreDelete ( numOfRecords int ) int64 {
2020-09-21 18:16:06 +08:00
/ *
2020-10-24 10:45:57 +08:00
long int
2021-01-19 11:37:16 +08:00
PreDelete ( CSegmentInterface c_segment , long int size ) ;
2020-09-09 15:24:07 +08:00
* /
2022-02-07 23:43:46 +08:00
var offset = C . PreDelete ( s . segmentPtr , C . int64_t ( int64 ( numOfRecords ) ) )
2020-09-09 15:24:07 +08:00
2020-09-12 16:57:37 +08:00
return int64 ( offset )
2020-09-09 15:24:07 +08:00
}
2022-04-29 13:35:49 +08:00
func ( s * Segment ) segmentInsert ( offset int64 , entityIDs [ ] UniqueID , timestamps [ ] Timestamp , record * segcorepb . InsertRecord ) error {
2021-06-15 12:41:40 +08:00
if s . segmentType != segmentTypeGrowing {
2022-05-24 21:11:59 +08:00
return fmt . Errorf ( "unexpected segmentType when segmentInsert, segmentType = %s" , s . segmentType . String ( ) )
2021-03-26 05:08:08 +08:00
}
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2021-10-18 20:08:42 +08:00
2022-05-13 09:53:53 +08:00
insertRecordBlob , err := proto . Marshal ( record )
if err != nil {
return fmt . Errorf ( "failed to marshal insert record: %s" , err )
}
2020-11-05 10:52:50 +08:00
2022-04-29 13:35:49 +08:00
var numOfRow = len ( entityIDs )
2022-02-07 23:43:46 +08:00
var cOffset = C . int64_t ( offset )
var cNumOfRows = C . int64_t ( numOfRow )
2022-04-29 13:35:49 +08:00
var cEntityIdsPtr = ( * C . int64_t ) ( & ( entityIDs ) [ 0 ] )
var cTimestampsPtr = ( * C . uint64_t ) ( & ( timestamps ) [ 0 ] )
2021-11-11 10:18:42 +08:00
status := C . Insert ( s . segmentPtr ,
2020-11-05 10:52:50 +08:00
cOffset ,
cNumOfRows ,
cEntityIdsPtr ,
cTimestampsPtr ,
2022-05-13 09:53:53 +08:00
( * C . uint8_t ) ( unsafe . Pointer ( & insertRecordBlob [ 0 ] ) ) ,
( C . uint64_t ) ( len ( insertRecordBlob ) ) )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "Insert failed" ) ; err != nil {
2021-06-05 16:21:36 +08:00
return err
2020-11-05 10:52:50 +08:00
}
2022-06-02 16:06:03 +08:00
metrics . QueryNodeNumEntities . WithLabelValues ( fmt . Sprint ( Params . QueryNodeCfg . GetNodeID ( ) ) ) . Add ( float64 ( numOfRow ) )
2021-01-30 16:02:10 +08:00
s . setRecentlyModified ( true )
2020-09-07 17:01:46 +08:00
return nil
2020-08-28 17:29:26 +08:00
}
2022-04-02 17:43:29 +08:00
func ( s * Segment ) segmentDelete ( offset int64 , entityIDs [ ] primaryKey , timestamps [ ] Timestamp ) error {
2020-09-21 18:16:06 +08:00
/ *
2020-11-26 16:01:31 +08:00
CStatus
2021-01-19 11:37:16 +08:00
Delete ( CSegmentInterface c_segment ,
2020-10-24 10:45:57 +08:00
long int reserved_offset ,
long size ,
const long * primary_keys ,
const unsigned long * timestamps ) ;
2020-09-02 16:23:50 +08:00
* /
2022-04-02 17:43:29 +08:00
if len ( entityIDs ) <= 0 {
return fmt . Errorf ( "empty pks to delete" )
}
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2021-10-15 11:02:32 +08:00
2022-04-02 17:43:29 +08:00
if len ( entityIDs ) != len ( timestamps ) {
2021-11-16 09:25:40 +08:00
return errors . New ( "length of entityIDs not equal to length of timestamps" )
2021-10-15 11:02:32 +08:00
}
2022-02-07 23:43:46 +08:00
var cOffset = C . int64_t ( offset )
2022-04-02 17:43:29 +08:00
var cSize = C . int64_t ( len ( entityIDs ) )
var cTimestampsPtr = ( * C . uint64_t ) ( & ( timestamps ) [ 0 ] )
2020-09-02 16:23:50 +08:00
2022-04-29 13:35:49 +08:00
ids := & schemapb . IDs { }
2022-04-02 17:43:29 +08:00
pkType := entityIDs [ 0 ] . Type ( )
switch pkType {
case schemapb . DataType_Int64 :
int64Pks := make ( [ ] int64 , len ( entityIDs ) )
for index , entity := range entityIDs {
int64Pks [ index ] = entity . ( * int64PrimaryKey ) . Value
}
2022-04-29 13:35:49 +08:00
ids . IdField = & schemapb . IDs_IntId {
IntId : & schemapb . LongArray {
Data : int64Pks ,
} ,
2022-04-02 17:43:29 +08:00
}
case schemapb . DataType_VarChar :
2022-04-29 13:35:49 +08:00
varCharPks := make ( [ ] string , len ( entityIDs ) )
for index , entity := range entityIDs {
varCharPks [ index ] = entity . ( * varCharPrimaryKey ) . Value
}
ids . IdField = & schemapb . IDs_StrId {
StrId : & schemapb . StringArray {
Data : varCharPks ,
} ,
}
2022-04-02 17:43:29 +08:00
default :
return fmt . Errorf ( "invalid data type of primary keys" )
2020-09-02 17:18:49 +08:00
}
2020-09-02 16:23:50 +08:00
2022-05-13 09:53:53 +08:00
dataBlob , err := proto . Marshal ( ids )
if err != nil {
return fmt . Errorf ( "failed to marshal ids: %s" , err )
}
2022-04-29 13:35:49 +08:00
2022-05-13 09:53:53 +08:00
status := C . Delete ( s . segmentPtr , cOffset , cSize , ( * C . uint8_t ) ( unsafe . Pointer ( & dataBlob [ 0 ] ) ) , ( C . uint64_t ) ( len ( dataBlob ) ) , cTimestampsPtr )
2022-04-29 13:35:49 +08:00
if err := HandleCStatus ( & status , "Delete failed" ) ; err != nil {
return err
}
2020-09-07 17:01:46 +08:00
return nil
2020-08-28 17:29:26 +08:00
}
2021-01-20 09:36:50 +08:00
//-------------------------------------------------------------------------------------- interfaces for sealed segment
2022-04-29 13:35:49 +08:00
func ( s * Segment ) segmentLoadFieldData ( fieldID int64 , rowCount int64 , data * schemapb . FieldData ) error {
2020-11-09 16:27:11 +08:00
/ *
2020-11-26 16:01:31 +08:00
CStatus
2021-01-20 09:36:50 +08:00
LoadFieldData ( CSegmentInterface c_segment , CLoadFieldDataInfo load_field_data_info ) ;
2020-11-09 16:27:11 +08:00
* /
2021-02-02 19:54:31 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2021-03-10 22:06:22 +08:00
if s . segmentType != segmentTypeSealed {
2021-03-22 16:36:10 +08:00
errMsg := fmt . Sprintln ( "segmentLoadFieldData failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
2021-01-19 14:13:49 +08:00
}
2022-05-13 09:53:53 +08:00
dataBlob , err := proto . Marshal ( data )
if err != nil {
return err
}
2021-01-21 15:20:23 +08:00
2021-01-20 09:36:50 +08:00
loadInfo := C . CLoadFieldDataInfo {
field_id : C . int64_t ( fieldID ) ,
2022-05-13 09:53:53 +08:00
blob : ( * C . uint8_t ) ( unsafe . Pointer ( & dataBlob [ 0 ] ) ) ,
blob_size : C . uint64_t ( len ( dataBlob ) ) ,
2021-01-20 09:36:50 +08:00
row_count : C . int64_t ( rowCount ) ,
2021-01-19 18:32:57 +08:00
}
2021-11-11 10:18:42 +08:00
status := C . LoadFieldData ( s . segmentPtr , loadInfo )
if err := HandleCStatus ( & status , "LoadFieldData failed" ) ; err != nil {
return err
2020-12-24 20:55:40 +08:00
}
2022-05-07 10:27:51 +08:00
log . Info ( "load field done" ,
2021-03-22 16:36:10 +08:00
zap . Int64 ( "fieldID" , fieldID ) ,
2022-04-29 13:35:49 +08:00
zap . Int64 ( "row count" , rowCount ) ,
2021-03-22 16:36:10 +08:00
zap . Int64 ( "segmentID" , s . ID ( ) ) )
return nil
}
2022-04-02 17:43:29 +08:00
func ( s * Segment ) segmentLoadDeletedRecord ( primaryKeys [ ] primaryKey , timestamps [ ] Timestamp , rowCount int64 ) error {
2021-10-22 13:11:11 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
2022-04-02 17:43:29 +08:00
if len ( primaryKeys ) <= 0 {
return fmt . Errorf ( "empty pks to delete" )
2021-10-22 18:51:14 +08:00
}
2022-04-02 17:43:29 +08:00
pkType := primaryKeys [ 0 ] . Type ( )
2022-04-29 13:35:49 +08:00
ids := & schemapb . IDs { }
2022-04-02 17:43:29 +08:00
switch pkType {
case schemapb . DataType_Int64 :
int64Pks := make ( [ ] int64 , len ( primaryKeys ) )
for index , pk := range primaryKeys {
int64Pks [ index ] = pk . ( * int64PrimaryKey ) . Value
}
2022-04-29 13:35:49 +08:00
ids . IdField = & schemapb . IDs_IntId {
IntId : & schemapb . LongArray {
Data : int64Pks ,
} ,
2022-04-02 17:43:29 +08:00
}
case schemapb . DataType_VarChar :
2022-04-29 13:35:49 +08:00
varCharPks := make ( [ ] string , len ( primaryKeys ) )
for index , pk := range primaryKeys {
varCharPks [ index ] = pk . ( * varCharPrimaryKey ) . Value
}
ids . IdField = & schemapb . IDs_StrId {
StrId : & schemapb . StringArray {
Data : varCharPks ,
} ,
}
2022-04-02 17:43:29 +08:00
default :
return fmt . Errorf ( "invalid data type of primary keys" )
2021-10-22 18:51:14 +08:00
}
2021-10-22 13:11:11 +08:00
2022-05-13 09:53:53 +08:00
idsBlob , err := proto . Marshal ( ids )
if err != nil {
return err
}
2022-04-29 13:35:49 +08:00
loadInfo := C . CLoadDeletedRecordInfo {
2022-05-13 09:53:53 +08:00
timestamps : unsafe . Pointer ( & timestamps [ 0 ] ) ,
primary_keys : ( * C . uint8_t ) ( unsafe . Pointer ( & idsBlob [ 0 ] ) ) ,
primary_keys_size : C . uint64_t ( len ( idsBlob ) ) ,
row_count : C . int64_t ( rowCount ) ,
2022-04-29 13:35:49 +08:00
}
/ *
CStatus
LoadDeletedRecord ( CSegmentInterface c_segment , CLoadDeletedRecordInfo deleted_record_info )
* /
status := C . LoadDeletedRecord ( s . segmentPtr , loadInfo )
if err := HandleCStatus ( & status , "LoadDeletedRecord failed" ) ; err != nil {
return err
}
2022-05-07 10:27:51 +08:00
log . Info ( "load deleted record done" ,
2021-10-22 18:51:14 +08:00
zap . Int64 ( "row count" , rowCount ) ,
zap . Int64 ( "segmentID" , s . ID ( ) ) )
2021-10-22 13:11:11 +08:00
return nil
}
2022-04-29 13:35:49 +08:00
func ( s * Segment ) segmentLoadIndexData ( bytesIndex [ ] [ ] byte , indexInfo * querypb . FieldIndexInfo , fieldType schemapb . DataType ) error {
2021-04-07 18:29:19 +08:00
loadIndexInfo , err := newLoadIndexInfo ( )
defer deleteLoadIndexInfo ( loadIndexInfo )
if err != nil {
return err
}
2022-02-08 21:57:46 +08:00
2022-04-29 13:35:49 +08:00
err = loadIndexInfo . appendIndexInfo ( bytesIndex , indexInfo , fieldType )
2021-04-07 18:29:19 +08:00
if err != nil {
return err
}
2021-03-22 16:36:10 +08:00
if s . segmentPtr == nil {
return errors . New ( "null seg core pointer" )
}
if s . segmentType != segmentTypeSealed {
errMsg := fmt . Sprintln ( "updateSegmentIndex failed, illegal segment type " , s . segmentType , "segmentID = " , s . ID ( ) )
return errors . New ( errMsg )
}
status := C . UpdateSealedSegmentIndex ( s . segmentPtr , loadIndexInfo . cLoadIndexInfo )
2021-11-11 10:18:42 +08:00
if err := HandleCStatus ( & status , "UpdateSealedSegmentIndex failed" ) ; err != nil {
return err
2021-03-22 16:36:10 +08:00
}
2022-05-26 14:58:01 +08:00
log . Info ( "updateSegmentIndex done" , zap . Int64 ( "segmentID" , s . ID ( ) ) , zap . Int64 ( "fieldID" , indexInfo . FieldID ) )
2021-03-22 16:36:10 +08:00
return nil
}