enhance: add ctx for HandleCStatus and callers (#29517)

See also #29516

Make `HandleCStatus` print trace id for better logging

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-12-27 16:10:47 +08:00 committed by GitHub
parent 632d8b3743
commit b251c3a682
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 208 additions and 237 deletions

View File

@ -88,6 +88,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
if growing == nil {
var err error
growing, err = segments.NewSegment(
context.Background(),
sd.collection,
segmentID,
insertData.PartitionID,
@ -107,7 +108,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
}
}
err := growing.Insert(insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
err := growing.Insert(context.Background(), insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
if err != nil {
log.Error("failed to insert data into growing segment",
zap.Int64("segmentID", segmentID),
@ -334,7 +335,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
}
log.Info("forwarding L0 delete records...", zap.Int("deletionCount", len(deletedPks)))
err = segment.Delete(deletedPks, deletedTss)
err = segment.Delete(ctx, deletedPks, deletedTss)
if err != nil {
log.Warn("failed to forward L0 deletions to growing segment",
zap.Error(err),

View File

@ -235,7 +235,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
ms.EXPECT().Partition().Return(info.GetPartitionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
@ -557,8 +557,8 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
mockErr := merr.WrapErrServiceInternal("mock")
growing0.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
growing1.EXPECT().Delete(mock.Anything, mock.Anything).Return(mockErr)
growing0.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
growing1.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
s.loader.EXPECT().Load(
mock.Anything,
@ -768,7 +768,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
ms.EXPECT().Collection().Return(info.GetCollectionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})

View File

@ -94,7 +94,7 @@ func (s *DelegatorSuite) SetupTest() {
ms.EXPECT().Collection().Return(info.GetCollectionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
return ms
})
}, nil)

View File

@ -27,20 +27,20 @@ package segments
import "C"
import (
"fmt"
"context"
"unsafe"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/cgoconverter"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// HandleCStatus deals with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string, fields ...zap.Field) error {
func HandleCStatus(ctx context.Context, status *C.CStatus, extraInfo string, fields ...zap.Field) error {
if status.error_code == 0 {
return nil
}
@ -52,11 +52,11 @@ func HandleCStatus(status *C.CStatus, extraInfo string, fields ...zap.Field) err
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
finalMsg := fmt.Sprintf("%s: %s", errorName, errorMsg)
logMsg := fmt.Sprintf("%s, segcore error: %s\n", extraInfo, finalMsg)
log := log.With().WithOptions(zap.AddCallerSkip(1))
log.Warn(logMsg, fields...)
return errors.New(finalMsg)
log.Ctx(ctx).With(fields...).
WithOptions(zap.AddCallerSkip(1)) // Add caller stack to show HandleCStatus caller
log.Warn("CStatus returns err", zap.String("errorName", errorName), zap.String("errorMsg", errorMsg))
return merr.WrapErrServiceInternal(errorName, errorMsg)
}
// HandleCProto deal with the result proto returned from CGO
@ -84,14 +84,14 @@ func GetCProtoBlob(cProto *C.CProto) []byte {
return blob
}
func GetLocalUsedSize(path string) (int64, error) {
func GetLocalUsedSize(ctx context.Context, path string) (int64, error) {
var availableSize int64
cSize := (*C.int64_t)(&availableSize)
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
status := C.GetLocalUsedSize(cPath, cSize)
err := HandleCStatus(&status, "get local used size failed")
err := HandleCStatus(ctx, &status, "get local used size failed")
if err != nil {
return 0, err
}

View File

@ -23,6 +23,7 @@ package segments
import "C"
import (
"context"
"unsafe"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -32,11 +33,11 @@ type LoadFieldDataInfo struct {
cLoadFieldDataInfo C.CLoadFieldDataInfo
}
func newLoadFieldDataInfo() (*LoadFieldDataInfo, error) {
func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) {
var cLoadFieldDataInfo C.CLoadFieldDataInfo
status := C.NewLoadFieldDataInfo(&cLoadFieldDataInfo)
if err := HandleCStatus(&status, "newLoadFieldDataInfo failed"); err != nil {
if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil {
return nil, err
}
return &LoadFieldDataInfo{cLoadFieldDataInfo: cLoadFieldDataInfo}, nil
@ -46,22 +47,22 @@ func deleteFieldDataInfo(info *LoadFieldDataInfo) {
C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo)
}
func (ld *LoadFieldDataInfo) appendLoadFieldInfo(fieldID int64, rowCount int64) error {
func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error {
cFieldID := C.int64_t(fieldID)
cRowCount := C.int64_t(rowCount)
status := C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount)
return HandleCStatus(&status, "appendLoadFieldInfo failed")
return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed")
}
func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(fieldID int64, binlog *datapb.Binlog) error {
func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error {
cFieldID := C.int64_t(fieldID)
cEntriesNum := C.int64_t(binlog.GetEntriesNum())
cFile := C.CString(binlog.GetLogPath())
defer C.free(unsafe.Pointer(cFile))
status := C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile)
return HandleCStatus(&status, "appendLoadFieldDataPath failed")
return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed")
}
func (ld *LoadFieldDataInfo) enableMmap(fieldID int64, enabled bool) {

View File

@ -25,6 +25,7 @@ package segments
import "C"
import (
"context"
"unsafe"
"github.com/pingcap/log"
@ -45,11 +46,11 @@ type LoadIndexInfo struct {
}
// newLoadIndexInfo returns a new LoadIndexInfo and error
func newLoadIndexInfo() (*LoadIndexInfo, error) {
func newLoadIndexInfo(ctx context.Context) (*LoadIndexInfo, error) {
var cLoadIndexInfo C.CLoadIndexInfo
status := C.NewLoadIndexInfo(&cLoadIndexInfo)
if err := HandleCStatus(&status, "NewLoadIndexInfo failed"); err != nil {
if err := HandleCStatus(ctx, &status, "NewLoadIndexInfo failed"); err != nil {
return nil, err
}
return &LoadIndexInfo{cLoadIndexInfo: cLoadIndexInfo}, nil
@ -60,7 +61,7 @@ func deleteLoadIndexInfo(info *LoadIndexInfo) {
C.DeleteLoadIndexInfo(info.cLoadIndexInfo)
}
func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error {
func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error {
fieldID := indexInfo.FieldID
indexPaths := indexInfo.IndexFilePaths
@ -70,12 +71,12 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo,
delete(indexParams, common.MmapEnabledKey)
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath)
err := li.appendFieldInfo(ctx, collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath)
if err != nil {
return err
}
err = li.appendIndexInfo(indexInfo.IndexID, indexInfo.BuildID, indexInfo.IndexVersion)
err = li.appendIndexInfo(ctx, indexInfo.IndexID, indexInfo.BuildID, indexInfo.IndexVersion)
if err != nil {
return err
}
@ -95,54 +96,54 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo,
log.Info("load with index params", zap.Any("indexParams", indexParams))
for key, value := range indexParams {
err = li.appendIndexParam(key, value)
err = li.appendIndexParam(ctx, key, value)
if err != nil {
return err
}
}
if err := li.appendIndexEngineVersion(indexInfo.GetCurrentIndexVersion()); err != nil {
if err := li.appendIndexEngineVersion(ctx, indexInfo.GetCurrentIndexVersion()); err != nil {
return err
}
err = li.appendIndexData(indexPaths)
err = li.appendIndexData(ctx, indexPaths)
return err
}
// appendIndexParam append indexParam to index
func (li *LoadIndexInfo) appendIndexParam(indexKey string, indexValue string) error {
func (li *LoadIndexInfo) appendIndexParam(ctx context.Context, indexKey string, indexValue string) error {
cIndexKey := C.CString(indexKey)
defer C.free(unsafe.Pointer(cIndexKey))
cIndexValue := C.CString(indexValue)
defer C.free(unsafe.Pointer(cIndexValue))
status := C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue)
return HandleCStatus(&status, "AppendIndexParam failed")
return HandleCStatus(ctx, &status, "AppendIndexParam failed")
}
func (li *LoadIndexInfo) appendIndexInfo(indexID int64, buildID int64, indexVersion int64) error {
func (li *LoadIndexInfo) appendIndexInfo(ctx context.Context, indexID int64, buildID int64, indexVersion int64) error {
cIndexID := C.int64_t(indexID)
cBuildID := C.int64_t(buildID)
cIndexVersion := C.int64_t(indexVersion)
status := C.AppendIndexInfo(li.cLoadIndexInfo, cIndexID, cBuildID, cIndexVersion)
return HandleCStatus(&status, "AppendIndexInfo failed")
return HandleCStatus(ctx, &status, "AppendIndexInfo failed")
}
func (li *LoadIndexInfo) cleanLocalData() error {
func (li *LoadIndexInfo) cleanLocalData(ctx context.Context) error {
status := C.CleanLoadedIndex(li.cLoadIndexInfo)
return HandleCStatus(&status, "failed to clean cached data on disk")
return HandleCStatus(ctx, &status, "failed to clean cached data on disk")
}
func (li *LoadIndexInfo) appendIndexFile(filePath string) error {
func (li *LoadIndexInfo) appendIndexFile(ctx context.Context, filePath string) error {
cIndexFilePath := C.CString(filePath)
defer C.free(unsafe.Pointer(cIndexFilePath))
status := C.AppendIndexFilePath(li.cLoadIndexInfo, cIndexFilePath)
return HandleCStatus(&status, "AppendIndexIFile failed")
return HandleCStatus(ctx, &status, "AppendIndexIFile failed")
}
// appendFieldInfo appends fieldID & fieldType to index
func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, enableMmap bool, mmapDirPath string) error {
func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, enableMmap bool, mmapDirPath string) error {
cColID := C.int64_t(collectionID)
cParID := C.int64_t(partitionID)
cSegID := C.int64_t(segmentID)
@ -152,25 +153,25 @@ func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64,
cMmapDirPath := C.CString(mmapDirPath)
defer C.free(unsafe.Pointer(cMmapDirPath))
status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cEnableMmap, cMmapDirPath)
return HandleCStatus(&status, "AppendFieldInfo failed")
return HandleCStatus(ctx, &status, "AppendFieldInfo failed")
}
// appendIndexData appends index path to cLoadIndexInfo and create index
func (li *LoadIndexInfo) appendIndexData(indexKeys []string) error {
func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string) error {
for _, indexPath := range indexKeys {
err := li.appendIndexFile(indexPath)
err := li.appendIndexFile(ctx, indexPath)
if err != nil {
return err
}
}
status := C.AppendIndexV2(li.cLoadIndexInfo)
return HandleCStatus(&status, "AppendIndex failed")
return HandleCStatus(ctx, &status, "AppendIndex failed")
}
func (li *LoadIndexInfo) appendIndexEngineVersion(indexEngineVersion int32) error {
func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngineVersion int32) error {
cIndexEngineVersion := C.int32_t(indexEngineVersion)
status := C.AppendIndexEngineVersionToLoadInfo(li.cLoadIndexInfo, cIndexEngineVersion)
return HandleCStatus(&status, "AppendIndexEngineVersion failed")
return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed")
}

View File

@ -1,6 +1,7 @@
package segments
import (
"context"
"testing"
"github.com/samber/lo"
@ -44,6 +45,7 @@ func (s *ManagerSuite) SetupTest() {
for i, id := range s.segmentIDs {
schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64)
segment, err := NewSegment(
context.Background(),
NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection),
id,
s.partitionIDs[i],

View File

@ -1141,7 +1141,7 @@ func genHNSWDSL(schema *schemapb.CollectionSchema, ef int, topK int64, roundDeci
>`, nil
}
func checkSearchResult(nq int64, plan *SearchPlan, searchResult *SearchResult) error {
func checkSearchResult(ctx context.Context, nq int64, plan *SearchPlan, searchResult *SearchResult) error {
searchResults := make([]*SearchResult, 0)
searchResults = append(searchResults, searchResult)
@ -1150,13 +1150,13 @@ func checkSearchResult(nq int64, plan *SearchPlan, searchResult *SearchResult) e
sliceTopKs := []int64{topK, topK / 2, topK, topK, topK / 2}
sInfo := ParseSliceInfo(sliceNQs, sliceTopKs, nq)
res, err := ReduceSearchResultsAndFillData(plan, searchResults, 1, sInfo.SliceNQs, sInfo.SliceTopKs)
res, err := ReduceSearchResultsAndFillData(ctx, plan, searchResults, 1, sInfo.SliceNQs, sInfo.SliceTopKs)
if err != nil {
return err
}
for i := 0; i < len(sInfo.SliceNQs); i++ {
blob, err := GetSearchResultDataBlob(res, i)
blob, err := GetSearchResultDataBlob(ctx, res, i)
if err != nil {
return err
}
@ -1199,7 +1199,7 @@ func genSearchPlanAndRequests(collection *Collection, segments []int64, indexTyp
FromShardLeader: true,
Scope: querypb.DataScope_Historical,
}
return NewSearchRequest(collection, queryReq, queryReq.Req.GetPlaceholderGroup())
return NewSearchRequest(context.Background(), collection, queryReq, queryReq.Req.GetPlaceholderGroup())
}
func genInsertMsg(collection *Collection, partitionID, segment int64, numRows int) (*msgstream.InsertMsg, error) {
@ -1301,7 +1301,7 @@ func genSimpleRetrievePlan(collection *Collection) (*RetrievePlan, error) {
return nil, err
}
plan, err2 := NewRetrievePlan(collection, planBytes, timestamp, 100)
plan, err2 := NewRetrievePlan(context.Background(), collection, planBytes, timestamp, 100)
return plan, err2
}

View File

@ -115,13 +115,13 @@ func (_c *MockSegment_Collection_Call) RunAndReturn(run func() int64) *MockSegme
return _c
}
// Delete provides a mock function with given fields: primaryKeys, timestamps
func (_m *MockSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []uint64) error {
ret := _m.Called(primaryKeys, timestamps)
// Delete provides a mock function with given fields: ctx, primaryKeys, timestamps
func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64) error {
ret := _m.Called(ctx, primaryKeys, timestamps)
var r0 error
if rf, ok := ret.Get(0).(func([]storage.PrimaryKey, []uint64) error); ok {
r0 = rf(primaryKeys, timestamps)
if rf, ok := ret.Get(0).(func(context.Context, []storage.PrimaryKey, []uint64) error); ok {
r0 = rf(ctx, primaryKeys, timestamps)
} else {
r0 = ret.Error(0)
}
@ -135,15 +135,16 @@ type MockSegment_Delete_Call struct {
}
// Delete is a helper method to define mock.On call
// - ctx context.Context
// - primaryKeys []storage.PrimaryKey
// - timestamps []uint64
func (_e *MockSegment_Expecter) Delete(primaryKeys interface{}, timestamps interface{}) *MockSegment_Delete_Call {
return &MockSegment_Delete_Call{Call: _e.mock.On("Delete", primaryKeys, timestamps)}
func (_e *MockSegment_Expecter) Delete(ctx interface{}, primaryKeys interface{}, timestamps interface{}) *MockSegment_Delete_Call {
return &MockSegment_Delete_Call{Call: _e.mock.On("Delete", ctx, primaryKeys, timestamps)}
}
func (_c *MockSegment_Delete_Call) Run(run func(primaryKeys []storage.PrimaryKey, timestamps []uint64)) *MockSegment_Delete_Call {
func (_c *MockSegment_Delete_Call) Run(run func(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64)) *MockSegment_Delete_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]storage.PrimaryKey), args[1].([]uint64))
run(args[0].(context.Context), args[1].([]storage.PrimaryKey), args[2].([]uint64))
})
return _c
}
@ -153,7 +154,7 @@ func (_c *MockSegment_Delete_Call) Return(_a0 error) *MockSegment_Delete_Call {
return _c
}
func (_c *MockSegment_Delete_Call) RunAndReturn(run func([]storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call {
func (_c *MockSegment_Delete_Call) RunAndReturn(run func(context.Context, []storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call {
_c.Call.Return(run)
return _c
}
@ -370,13 +371,13 @@ func (_c *MockSegment_Indexes_Call) RunAndReturn(run func() []*IndexedFieldInfo)
return _c
}
// Insert provides a mock function with given fields: rowIDs, timestamps, record
func (_m *MockSegment) Insert(rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord) error {
ret := _m.Called(rowIDs, timestamps, record)
// Insert provides a mock function with given fields: ctx, rowIDs, timestamps, record
func (_m *MockSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord) error {
ret := _m.Called(ctx, rowIDs, timestamps, record)
var r0 error
if rf, ok := ret.Get(0).(func([]int64, []uint64, *segcorepb.InsertRecord) error); ok {
r0 = rf(rowIDs, timestamps, record)
if rf, ok := ret.Get(0).(func(context.Context, []int64, []uint64, *segcorepb.InsertRecord) error); ok {
r0 = rf(ctx, rowIDs, timestamps, record)
} else {
r0 = ret.Error(0)
}
@ -390,16 +391,17 @@ type MockSegment_Insert_Call struct {
}
// Insert is a helper method to define mock.On call
// - ctx context.Context
// - rowIDs []int64
// - timestamps []uint64
// - record *segcorepb.InsertRecord
func (_e *MockSegment_Expecter) Insert(rowIDs interface{}, timestamps interface{}, record interface{}) *MockSegment_Insert_Call {
return &MockSegment_Insert_Call{Call: _e.mock.On("Insert", rowIDs, timestamps, record)}
func (_e *MockSegment_Expecter) Insert(ctx interface{}, rowIDs interface{}, timestamps interface{}, record interface{}) *MockSegment_Insert_Call {
return &MockSegment_Insert_Call{Call: _e.mock.On("Insert", ctx, rowIDs, timestamps, record)}
}
func (_c *MockSegment_Insert_Call) Run(run func(rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord)) *MockSegment_Insert_Call {
func (_c *MockSegment_Insert_Call) Run(run func(ctx context.Context, rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord)) *MockSegment_Insert_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]int64), args[1].([]uint64), args[2].(*segcorepb.InsertRecord))
run(args[0].(context.Context), args[1].([]int64), args[2].([]uint64), args[3].(*segcorepb.InsertRecord))
})
return _c
}
@ -409,7 +411,7 @@ func (_c *MockSegment_Insert_Call) Return(_a0 error) *MockSegment_Insert_Call {
return _c
}
func (_c *MockSegment_Insert_Call) RunAndReturn(run func([]int64, []uint64, *segcorepb.InsertRecord) error) *MockSegment_Insert_Call {
func (_c *MockSegment_Insert_Call) RunAndReturn(run func(context.Context, []int64, []uint64, *segcorepb.InsertRecord) error) *MockSegment_Insert_Call {
_c.Call.Return(run)
return _c
}
@ -537,13 +539,13 @@ func (_c *MockSegment_Level_Call) RunAndReturn(run func() datapb.SegmentLevel) *
return _c
}
// LoadDeltaData provides a mock function with given fields: deltaData
func (_m *MockSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
ret := _m.Called(deltaData)
// LoadDeltaData provides a mock function with given fields: ctx, deltaData
func (_m *MockSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error {
ret := _m.Called(ctx, deltaData)
var r0 error
if rf, ok := ret.Get(0).(func(*storage.DeleteData) error); ok {
r0 = rf(deltaData)
if rf, ok := ret.Get(0).(func(context.Context, *storage.DeleteData) error); ok {
r0 = rf(ctx, deltaData)
} else {
r0 = ret.Error(0)
}
@ -557,14 +559,15 @@ type MockSegment_LoadDeltaData_Call struct {
}
// LoadDeltaData is a helper method to define mock.On call
// - ctx context.Context
// - deltaData *storage.DeleteData
func (_e *MockSegment_Expecter) LoadDeltaData(deltaData interface{}) *MockSegment_LoadDeltaData_Call {
return &MockSegment_LoadDeltaData_Call{Call: _e.mock.On("LoadDeltaData", deltaData)}
func (_e *MockSegment_Expecter) LoadDeltaData(ctx interface{}, deltaData interface{}) *MockSegment_LoadDeltaData_Call {
return &MockSegment_LoadDeltaData_Call{Call: _e.mock.On("LoadDeltaData", ctx, deltaData)}
}
func (_c *MockSegment_LoadDeltaData_Call) Run(run func(deltaData *storage.DeleteData)) *MockSegment_LoadDeltaData_Call {
func (_c *MockSegment_LoadDeltaData_Call) Run(run func(ctx context.Context, deltaData *storage.DeleteData)) *MockSegment_LoadDeltaData_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*storage.DeleteData))
run(args[0].(context.Context), args[1].(*storage.DeleteData))
})
return _c
}
@ -574,7 +577,7 @@ func (_c *MockSegment_LoadDeltaData_Call) Return(_a0 error) *MockSegment_LoadDel
return _c
}
func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(*storage.DeleteData) error) *MockSegment_LoadDeltaData_Call {
func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(context.Context, *storage.DeleteData) error) *MockSegment_LoadDeltaData_Call {
_c.Call.Return(run)
return _c
}

View File

@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
storage "github.com/milvus-io/milvus/internal/storage"
)
// MockSegmentManager is an autogenerated mock type for the SegmentManager type
@ -378,61 +376,6 @@ func (_c *MockSegmentManager_GetGrowing_Call) RunAndReturn(run func(int64) Segme
return _c
}
// GetL0DeleteRecords provides a mock function with given fields:
func (_m *MockSegmentManager) GetL0DeleteRecords() ([]storage.PrimaryKey, []uint64) {
ret := _m.Called()
var r0 []storage.PrimaryKey
var r1 []uint64
if rf, ok := ret.Get(0).(func() ([]storage.PrimaryKey, []uint64)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []storage.PrimaryKey); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]storage.PrimaryKey)
}
}
if rf, ok := ret.Get(1).(func() []uint64); ok {
r1 = rf()
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]uint64)
}
}
return r0, r1
}
// MockSegmentManager_GetL0DeleteRecords_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetL0DeleteRecords'
type MockSegmentManager_GetL0DeleteRecords_Call struct {
*mock.Call
}
// GetL0DeleteRecords is a helper method to define mock.On call
func (_e *MockSegmentManager_Expecter) GetL0DeleteRecords() *MockSegmentManager_GetL0DeleteRecords_Call {
return &MockSegmentManager_GetL0DeleteRecords_Call{Call: _e.mock.On("GetL0DeleteRecords")}
}
func (_c *MockSegmentManager_GetL0DeleteRecords_Call) Run(run func()) *MockSegmentManager_GetL0DeleteRecords_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSegmentManager_GetL0DeleteRecords_Call) Return(_a0 []storage.PrimaryKey, _a1 []uint64) *MockSegmentManager_GetL0DeleteRecords_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockSegmentManager_GetL0DeleteRecords_Call) RunAndReturn(run func() ([]storage.PrimaryKey, []uint64)) *MockSegmentManager_GetL0DeleteRecords_Call {
_c.Call.Return(run)
return _c
}
// GetSealed provides a mock function with given fields: segmentID
func (_m *MockSegmentManager) GetSealed(segmentID int64) Segment {
ret := _m.Called(segmentID)

View File

@ -26,6 +26,7 @@ package segments
import "C"
import (
"context"
"fmt"
"unsafe"
@ -41,14 +42,14 @@ type SearchPlan struct {
cSearchPlan C.CSearchPlan
}
func createSearchPlanByExpr(col *Collection, expr []byte, metricType string) (*SearchPlan, error) {
func createSearchPlanByExpr(ctx context.Context, col *Collection, expr []byte, metricType string) (*SearchPlan, error) {
if col.collectionPtr == nil {
return nil, errors.New("nil collection ptr, collectionID = " + fmt.Sprintln(col.id))
}
var cPlan C.CSearchPlan
status := C.CreateSearchPlanByExpr(col.collectionPtr, unsafe.Pointer(&expr[0]), (C.int64_t)(len(expr)), &cPlan)
err1 := HandleCStatus(&status, "Create Plan by expr failed")
err1 := HandleCStatus(ctx, &status, "Create Plan by expr failed")
if err1 != nil {
return nil, err1
}
@ -91,12 +92,12 @@ type SearchRequest struct {
searchFieldID UniqueID
}
func NewSearchRequest(collection *Collection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) {
func NewSearchRequest(ctx context.Context, collection *Collection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) {
var err error
var plan *SearchPlan
metricType := req.GetReq().GetMetricType()
expr := req.Req.SerializedExprPlan
plan, err = createSearchPlanByExpr(collection, expr, metricType)
plan, err = createSearchPlanByExpr(ctx, collection, expr, metricType)
if err != nil {
return nil, err
}
@ -111,14 +112,14 @@ func NewSearchRequest(collection *Collection, req *querypb.SearchRequest, placeh
var cPlaceholderGroup C.CPlaceholderGroup
status := C.ParsePlaceholderGroup(plan.cSearchPlan, blobPtr, blobSize, &cPlaceholderGroup)
if err := HandleCStatus(&status, "parser searchRequest failed"); err != nil {
if err := HandleCStatus(ctx, &status, "parser searchRequest failed"); err != nil {
plan.delete()
return nil, err
}
var fieldID C.int64_t
status = C.GetFieldID(plan.cSearchPlan, &fieldID)
if err = HandleCStatus(&status, "get fieldID from plan failed"); err != nil {
if err = HandleCStatus(ctx, &status, "get fieldID from plan failed"); err != nil {
plan.delete()
return nil, err
}
@ -149,7 +150,7 @@ func (req *SearchRequest) Delete() {
C.DeletePlaceholderGroup(req.cPlaceholderGroup)
}
func parseSearchRequest(plan *SearchPlan, searchRequestBlob []byte) (*SearchRequest, error) {
func parseSearchRequest(ctx context.Context, plan *SearchPlan, searchRequestBlob []byte) (*SearchRequest, error) {
if len(searchRequestBlob) == 0 {
return nil, fmt.Errorf("empty search request")
}
@ -158,7 +159,7 @@ func parseSearchRequest(plan *SearchPlan, searchRequestBlob []byte) (*SearchRequ
var cPlaceholderGroup C.CPlaceholderGroup
status := C.ParsePlaceholderGroup(plan.cSearchPlan, blobPtr, blobSize, &cPlaceholderGroup)
if err := HandleCStatus(&status, "parser searchRequest failed"); err != nil {
if err := HandleCStatus(ctx, &status, "parser searchRequest failed"); err != nil {
return nil, err
}
@ -173,7 +174,7 @@ type RetrievePlan struct {
msgID UniqueID // only used to debug.
}
func NewRetrievePlan(col *Collection, expr []byte, timestamp Timestamp, msgID UniqueID) (*RetrievePlan, error) {
func NewRetrievePlan(ctx context.Context, col *Collection, expr []byte, timestamp Timestamp, msgID UniqueID) (*RetrievePlan, error) {
col.mu.RLock()
defer col.mu.RUnlock()
@ -184,7 +185,7 @@ func NewRetrievePlan(col *Collection, expr []byte, timestamp Timestamp, msgID Un
var cPlan C.CRetrievePlan
status := C.CreateRetrievePlanByExpr(col.collectionPtr, unsafe.Pointer(&expr[0]), (C.int64_t)(len(expr)), &cPlan)
err := HandleCStatus(&status, "Create retrieve plan by expr failed")
err := HandleCStatus(ctx, &status, "Create retrieve plan by expr failed")
if err != nil {
return nil, err
}

View File

@ -17,6 +17,7 @@
package segments
import (
"context"
"testing"
"github.com/golang/protobuf/proto"
@ -57,7 +58,7 @@ func (suite *PlanSuite) TestPlanCreateByExpr() {
expr, err := proto.Marshal(planNode)
suite.NoError(err)
_, err = createSearchPlanByExpr(suite.collection, expr, "")
_, err = createSearchPlanByExpr(context.Background(), suite.collection, expr, "")
suite.Error(err)
}
@ -66,13 +67,13 @@ func (suite *PlanSuite) TestPlanFail() {
id: -1,
}
_, err := createSearchPlanByExpr(collection, nil, "")
_, err := createSearchPlanByExpr(context.Background(), collection, nil, "")
suite.Error(err)
}
func (suite *PlanSuite) TestQueryPlanCollectionReleased() {
collection := &Collection{id: suite.collectionID}
_, err := NewRetrievePlan(collection, nil, 0, 0)
_, err := NewRetrievePlan(context.Background(), collection, nil, 0, 0)
suite.Error(err)
}

View File

@ -25,6 +25,7 @@ package segments
import "C"
import (
"context"
"fmt"
)
@ -70,7 +71,7 @@ func ParseSliceInfo(originNQs []int64, originTopKs []int64, nqPerSlice int64) *S
return sInfo
}
func ReduceSearchResultsAndFillData(plan *SearchPlan, searchResults []*SearchResult,
func ReduceSearchResultsAndFillData(ctx context.Context, plan *SearchPlan, searchResults []*SearchResult,
numSegments int64, sliceNQs []int64, sliceTopKs []int64,
) (searchResultDataBlobs, error) {
if plan.cSearchPlan == nil {
@ -100,16 +101,16 @@ func ReduceSearchResultsAndFillData(plan *SearchPlan, searchResults []*SearchRes
var cSearchResultDataBlobs searchResultDataBlobs
status := C.ReduceSearchResultsAndFillData(&cSearchResultDataBlobs, plan.cSearchPlan, cSearchResultPtr,
cNumSegments, cSliceNQSPtr, cSliceTopKSPtr, cNumSlices)
if err := HandleCStatus(&status, "ReduceSearchResultsAndFillData failed"); err != nil {
if err := HandleCStatus(ctx, &status, "ReduceSearchResultsAndFillData failed"); err != nil {
return nil, err
}
return cSearchResultDataBlobs, nil
}
func GetSearchResultDataBlob(cSearchResultDataBlobs searchResultDataBlobs, blobIndex int) ([]byte, error) {
func GetSearchResultDataBlob(ctx context.Context, cSearchResultDataBlobs searchResultDataBlobs, blobIndex int) ([]byte, error) {
var blob C.CProto
status := C.GetSearchResultDataBlob(&blob, cSearchResultDataBlobs, C.int32_t(blobIndex))
if err := HandleCStatus(&status, "marshal failed"); err != nil {
if err := HandleCStatus(ctx, &status, "marshal failed"); err != nil {
return nil, err
}
return GetCProtoBlob(&blob), nil

View File

@ -72,7 +72,8 @@ func (suite *ReduceSuite) SetupTest() {
GenTestIndexMeta(suite.collectionID, schema),
querypb.LoadType_LoadCollection,
)
suite.segment, err = NewSegment(suite.collection,
suite.segment, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
@ -95,7 +96,7 @@ func (suite *ReduceSuite) SetupTest() {
)
suite.Require().NoError(err)
for _, binlog := range binlogs {
err = suite.segment.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false)
err = suite.segment.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
suite.Require().NoError(err)
}
}
@ -164,29 +165,29 @@ func (suite *ReduceSuite) TestReduceAllFunc() {
proto.UnmarshalText(planStr, &planpb)
serializedPlan, err := proto.Marshal(&planpb)
suite.NoError(err)
plan, err := createSearchPlanByExpr(suite.collection, serializedPlan, "")
plan, err := createSearchPlanByExpr(context.Background(), suite.collection, serializedPlan, "")
suite.NoError(err)
searchReq, err := parseSearchRequest(plan, placeGroupByte)
searchReq, err := parseSearchRequest(context.Background(), plan, placeGroupByte)
suite.NoError(err)
defer searchReq.Delete()
searchResult, err := suite.segment.Search(context.Background(), searchReq)
suite.NoError(err)
err = checkSearchResult(nq, plan, searchResult)
err = checkSearchResult(context.Background(), nq, plan, searchResult)
suite.NoError(err)
}
func (suite *ReduceSuite) TestReduceInvalid() {
plan := &SearchPlan{}
_, err := ReduceSearchResultsAndFillData(plan, nil, 1, nil, nil)
_, err := ReduceSearchResultsAndFillData(context.Background(), plan, nil, 1, nil, nil)
suite.Error(err)
searchReq, err := genSearchPlanAndRequests(suite.collection, []int64{suite.segmentID}, IndexHNSW, 10)
suite.NoError(err)
searchResults := make([]*SearchResult, 0)
searchResults = append(searchResults, nil)
_, err = ReduceSearchResultsAndFillData(searchReq.plan, searchResults, 1, []int64{10}, []int64{10})
_, err = ReduceSearchResultsAndFillData(context.Background(), searchReq.plan, searchResults, 1, []int64{10}, []int64{10})
suite.Error(err)
}

View File

@ -83,7 +83,8 @@ func (suite *RetrieveSuite) SetupTest() {
)
suite.collection = suite.manager.Collection.Get(suite.collectionID)
suite.sealed, err = NewSegment(suite.collection,
suite.sealed, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
@ -106,11 +107,12 @@ func (suite *RetrieveSuite) SetupTest() {
)
suite.Require().NoError(err)
for _, binlog := range binlogs {
err = suite.sealed.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false)
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
suite.Require().NoError(err)
}
suite.growing, err = NewSegment(suite.collection,
suite.growing, err = NewSegment(ctx,
suite.collection,
suite.segmentID+1,
suite.partitionID,
suite.collectionID,
@ -127,7 +129,7 @@ func (suite *RetrieveSuite) SetupTest() {
suite.Require().NoError(err)
insertRecord, err := storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg)
suite.Require().NoError(err)
err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
err = suite.growing.Insert(ctx, insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
suite.Require().NoError(err)
suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed)

View File

@ -74,7 +74,8 @@ func (suite *SearchSuite) SetupTest() {
)
suite.collection = suite.manager.Collection.Get(suite.collectionID)
suite.sealed, err = NewSegment(suite.collection,
suite.sealed, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
@ -97,11 +98,12 @@ func (suite *SearchSuite) SetupTest() {
)
suite.Require().NoError(err)
for _, binlog := range binlogs {
err = suite.sealed.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false)
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
suite.Require().NoError(err)
}
suite.growing, err = NewSegment(suite.collection,
suite.growing, err = NewSegment(ctx,
suite.collection,
suite.segmentID+1,
suite.partitionID,
suite.collectionID,
@ -118,7 +120,7 @@ func (suite *SearchSuite) SetupTest() {
suite.Require().NoError(err)
insertRecord, err := storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg)
suite.Require().NoError(err)
suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
suite.growing.Insert(ctx, insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed)
suite.manager.Segment.Put(SegmentTypeGrowing, suite.growing)

View File

@ -157,7 +157,8 @@ type LocalSegment struct {
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
}
func NewSegment(collection *Collection,
func NewSegment(ctx context.Context,
collection *Collection,
segmentID int64,
partitionID int64,
collectionID int64,
@ -168,6 +169,7 @@ func NewSegment(collection *Collection,
deltaPosition *msgpb.MsgPosition,
level datapb.SegmentLevel,
) (Segment, error) {
log := log.Ctx(ctx)
/*
CStatus
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type, CSegmentInterface* newSegment);
@ -188,7 +190,7 @@ func NewSegment(collection *Collection,
var newPtr C.CSegmentInterface
_, err := GetDynamicPool().Submit(func() (any, error) {
status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(segmentID), &newPtr)
err := HandleCStatus(&status, "NewSegmentFailed",
err := HandleCStatus(ctx, &status, "NewSegmentFailed",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
@ -387,7 +389,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil, nil
}).Await()
if err := HandleCStatus(&status, "Search failed",
if err := HandleCStatus(ctx, &status, "Search failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.typ.String())); err != nil {
@ -442,7 +444,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
return nil, nil
}).Await()
if err := HandleCStatus(&status, "Retrieve failed",
if err := HandleCStatus(ctx, &status, "Retrieve failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
@ -479,7 +481,7 @@ func (s *LocalSegment) GetFieldDataPath(index *IndexedFieldInfo, offset int64) (
}
// -------------------------------------------------------------------------------------- interfaces for growing segment
func (s *LocalSegment) preInsert(numOfRecords int) (int64, error) {
func (s *LocalSegment) preInsert(ctx context.Context, numOfRecords int) (int64, error) {
/*
long int
PreInsert(CSegmentInterface c_segment, long int size);
@ -492,13 +494,13 @@ func (s *LocalSegment) preInsert(numOfRecords int) (int64, error) {
status = C.PreInsert(s.ptr, C.int64_t(int64(numOfRecords)), cOffset)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "PreInsert failed"); err != nil {
if err := HandleCStatus(ctx, &status, "PreInsert failed"); err != nil {
return 0, err
}
return offset, nil
}
func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error {
func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error {
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.typ.String())
}
@ -510,7 +512,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
}
offset, err := s.preInsert(len(rowIDs))
offset, err := s.preInsert(ctx, len(rowIDs))
if err != nil {
return err
}
@ -539,7 +541,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r
)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "Insert failed"); err != nil {
if err := HandleCStatus(ctx, &status, "Insert failed"); err != nil {
return err
}
@ -556,7 +558,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r
return nil
}
func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
/*
CStatus
Delete(CSegmentInterface c_segment,
@ -624,7 +626,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ
return nil, nil
}).Await()
if err := HandleCStatus(&status, "Delete failed"); err != nil {
if err := HandleCStatus(ctx, &status, "Delete failed"); err != nil {
return err
}
@ -635,7 +637,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ
}
// -------------------------------------------------------------------------------------- interfaces for sealed segment
func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.FieldBinlog) error {
func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
@ -643,13 +645,13 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
}
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
)
loadFieldDataInfo, err := newLoadFieldDataInfo()
loadFieldDataInfo, err := newLoadFieldDataInfo(ctx)
defer deleteFieldDataInfo(loadFieldDataInfo)
if err != nil {
return err
@ -657,13 +659,13 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
for _, field := range fields {
fieldID := field.FieldID
err = loadFieldDataInfo.appendLoadFieldInfo(fieldID, rowCount)
err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount)
if err != nil {
return err
}
for _, binlog := range field.Binlogs {
err = loadFieldDataInfo.appendLoadFieldDataPath(fieldID, binlog)
err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog)
if err != nil {
return err
}
@ -677,7 +679,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "LoadMultiFieldData failed",
if err := HandleCStatus(ctx, &status, "LoadMultiFieldData failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID())); err != nil {
@ -692,7 +694,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
return nil
}
func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datapb.FieldBinlog, mmapEnabled bool) error {
func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog, mmapEnabled bool) error {
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
@ -700,7 +702,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
}
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
@ -709,19 +711,19 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
)
log.Info("start loading field data for field")
loadFieldDataInfo, err := newLoadFieldDataInfo()
loadFieldDataInfo, err := newLoadFieldDataInfo(ctx)
defer deleteFieldDataInfo(loadFieldDataInfo)
if err != nil {
return err
}
err = loadFieldDataInfo.appendLoadFieldInfo(fieldID, rowCount)
err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount)
if err != nil {
return err
}
for _, binlog := range field.Binlogs {
err = loadFieldDataInfo.appendLoadFieldDataPath(fieldID, binlog)
err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog)
if err != nil {
return err
}
@ -735,7 +737,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "LoadFieldData failed",
if err := HandleCStatus(ctx, &status, "LoadFieldData failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
@ -749,7 +751,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
return nil
}
func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBinlog) error {
func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
@ -757,14 +759,14 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
}
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("row count", rowCount),
)
loadFieldDataInfo, err := newLoadFieldDataInfo()
loadFieldDataInfo, err := newLoadFieldDataInfo(ctx)
defer deleteFieldDataInfo(loadFieldDataInfo)
if err != nil {
return err
@ -772,13 +774,13 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
for _, field := range fields {
fieldID := field.FieldID
err = loadFieldDataInfo.appendLoadFieldInfo(fieldID, rowCount)
err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount)
if err != nil {
return err
}
for _, binlog := range field.Binlogs {
err = loadFieldDataInfo.appendLoadFieldDataPath(fieldID, binlog)
err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog)
if err != nil {
return err
}
@ -790,7 +792,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(&status, "AddFieldDataInfo failed",
if err := HandleCStatus(ctx, &status, "AddFieldDataInfo failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID())); err != nil {
@ -801,7 +803,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
return nil
}
func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error {
pks, tss := deltaData.Pks, deltaData.Tss
rowNum := deltaData.RowCount
@ -812,7 +814,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
}
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
@ -866,7 +868,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return nil, nil
}).Await()
if err := HandleCStatus(&status, "LoadDeletedRecord failed",
if err := HandleCStatus(ctx, &status, "LoadDeletedRecord failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID())); err != nil {
@ -882,16 +884,16 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return nil
}
func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
loadIndexInfo, err := newLoadIndexInfo()
func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
loadIndexInfo, err := newLoadIndexInfo(ctx)
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {
return err
}
err = loadIndexInfo.appendLoadIndexInfo(indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType)
err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType)
if err != nil {
if loadIndexInfo.cleanLocalData() != nil {
if loadIndexInfo.cleanLocalData(ctx) != nil {
log.Warn("failed to clean cached data on disk after append index failed",
zap.Int64("buildID", indexInfo.BuildID),
zap.Int64("index version", indexInfo.IndexVersion))
@ -903,10 +905,10 @@ func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType sc
return errors.New(errMsg)
}
return s.LoadIndexInfo(indexInfo, loadIndexInfo)
return s.LoadIndexInfo(ctx, indexInfo, loadIndexInfo)
}
func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error {
func (s *LocalSegment) LoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error {
log := log.With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
@ -926,7 +928,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
return nil, nil
}).Await()
if err := HandleCStatus(&status, "UpdateSealedSegmentIndex failed",
if err := HandleCStatus(ctx, &status, "UpdateSealedSegmentIndex failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
@ -938,7 +940,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
return nil
}
func (s *LocalSegment) UpdateFieldRawDataSize(numRows int64, fieldBinlog *datapb.FieldBinlog) error {
func (s *LocalSegment) UpdateFieldRawDataSize(ctx context.Context, numRows int64, fieldBinlog *datapb.FieldBinlog) error {
var status C.CStatus
fieldID := fieldBinlog.FieldID
fieldDataSize := int64(0)
@ -950,7 +952,7 @@ func (s *LocalSegment) UpdateFieldRawDataSize(numRows int64, fieldBinlog *datapb
return nil, nil
}).Await()
if err := HandleCStatus(&status, "updateFieldRawDataSize failed"); err != nil {
if err := HandleCStatus(ctx, &status, "updateFieldRawDataSize failed"); err != nil {
return err
}

View File

@ -54,9 +54,9 @@ type Segment interface {
HasRawData(fieldID int64) bool
// Modification related
Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
LoadDeltaData(deltaData *storage.DeleteData) error
Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error
LastDeltaTimestamp() uint64
Release()

View File

@ -131,15 +131,15 @@ func (s *L0Segment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorep
return nil, nil
}
func (s *L0Segment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error {
func (s *L0Segment) Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error {
return merr.WrapErrIoFailedReason("insert not supported for L0 segment")
}
func (s *L0Segment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
func (s *L0Segment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
return merr.WrapErrIoFailedReason("delete not supported for L0 segment")
}
func (s *L0Segment) LoadDeltaData(deltaData *storage.DeleteData) error {
func (s *L0Segment) LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error {
s.dataGuard.Lock()
defer s.dataGuard.Unlock()

View File

@ -213,6 +213,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
segment, err := NewSegment(
ctx,
collection,
segmentID,
partitionID,
@ -366,7 +367,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
memoryUsage := hardware.GetUsedMemoryCount()
totalMemory := hardware.GetMemoryCount()
diskUsage, err := GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
diskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
return resource, 0, errors.Wrap(err, "get local used size failed")
}
@ -583,7 +584,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
}
if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) {
log.Info("field index doesn't include raw data, load binlog...", zap.Int64("fieldID", fieldID), zap.String("index", info.IndexInfo.GetIndexName()))
if err = segment.LoadFieldData(fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil {
if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil {
log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err))
return err
}
@ -592,7 +593,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
if err := loader.loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo.GetNumOfRows()); err != nil {
return err
}
if err := segment.AddFieldDataInfo(loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
return err
}
// https://github.com/milvus-io/milvus/23654
@ -601,7 +602,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
return err
}
} else {
if err := segment.LoadMultiFieldData(loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil {
if err := segment.LoadMultiFieldData(ctx, loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil {
return err
}
}
@ -651,7 +652,8 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen
fieldBinLog := field
fieldID := field.FieldID
runningGroup.Go(func() error {
return segment.LoadFieldData(fieldID,
return segment.LoadFieldData(ctx,
fieldID,
rowCount,
fieldBinLog,
common.IsFieldMmapEnabled(collection.Schema(), fieldID),
@ -701,7 +703,7 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
return err
}
if typeutil.IsVariableDataType(field.GetDataType()) {
err = segment.UpdateFieldRawDataSize(numRows, fieldInfo.FieldBinlog)
err = segment.UpdateFieldRawDataSize(ctx, numRows, fieldInfo.FieldBinlog)
if err != nil {
return err
}
@ -731,7 +733,7 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS
return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load field index")
}
return segment.LoadIndex(indexInfo, fieldType)
return segment.LoadIndex(ctx, indexInfo, fieldType)
}
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,
@ -785,7 +787,7 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
}
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segment.ID()),
)
dCodec := storage.DeleteCodec{}
@ -817,7 +819,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return err
}
err = segment.LoadDeltaData(deltaData)
err = segment.LoadDeltaData(ctx, deltaData)
if err != nil {
return err
}
@ -936,7 +938,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
return 0, 0, errors.New("get memory failed when checkSegmentSize")
}
localDiskUsage, err := GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
localDiskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
return 0, 0, errors.Wrap(err, "get local used size failed")
}

View File

@ -61,7 +61,8 @@ func (suite *SegmentSuite) SetupTest() {
)
suite.collection = suite.manager.Collection.Get(suite.collectionID)
suite.sealed, err = NewSegment(suite.collection,
suite.sealed, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
@ -84,11 +85,12 @@ func (suite *SegmentSuite) SetupTest() {
)
suite.Require().NoError(err)
for _, binlog := range binlogs {
err = suite.sealed.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false)
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
suite.Require().NoError(err)
}
suite.growing, err = NewSegment(suite.collection,
suite.growing, err = NewSegment(ctx,
suite.collection,
suite.segmentID+1,
suite.partitionID,
suite.collectionID,
@ -105,7 +107,7 @@ func (suite *SegmentSuite) SetupTest() {
suite.Require().NoError(err)
insertRecord, err := storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg)
suite.Require().NoError(err)
err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
err = suite.growing.Insert(ctx, insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
suite.Require().NoError(err)
suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed)
@ -121,12 +123,14 @@ func (suite *SegmentSuite) TearDownTest() {
}
func (suite *SegmentSuite) TestDelete() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pks, err := storage.GenInt64PrimaryKeys(0, 1)
suite.NoError(err)
// Test for sealed
rowNum := suite.sealed.RowNum()
err = suite.sealed.Delete(pks, []uint64{1000, 1000})
err = suite.sealed.Delete(ctx, pks, []uint64{1000, 1000})
suite.NoError(err)
suite.Equal(rowNum-int64(len(pks)), suite.sealed.RowNum())
@ -134,7 +138,7 @@ func (suite *SegmentSuite) TestDelete() {
// Test for growing
rowNum = suite.growing.RowNum()
err = suite.growing.Delete(pks, []uint64{1000, 1000})
err = suite.growing.Delete(ctx, pks, []uint64{1000, 1000})
suite.NoError(err)
suite.Equal(rowNum-int64(len(pks)), suite.growing.RowNum())

View File

@ -295,7 +295,7 @@ func (node *QueryNode) Init() error {
node.factory.Init(paramtable.Get())
localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
localUsedSize, err := segments.GetLocalUsedSize(localRootPath)
localUsedSize, err := segments.GetLocalUsedSize(node.ctx, localRootPath)
if err != nil {
log.Warn("get local used size failed", zap.Error(err))
initError = err

View File

@ -221,6 +221,7 @@ func (suite *QueryNodeSuite) TestStop() {
schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64)
collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection)
segment, err := segments.NewSegment(
context.Background(),
collection,
100,
10,

View File

@ -1465,7 +1465,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
for _, segment := range segments {
err := segment.Delete(pks, req.GetTimestamps())
err := segment.Delete(ctx, pks, req.GetTimestamps())
if err != nil {
log.Warn("segment delete failed", zap.Error(err))
return merr.Status(err), nil

View File

@ -48,6 +48,7 @@ func (t *QueryStreamTask) PreExecute() error {
func (t *QueryStreamTask) Execute() error {
retrievePlan, err := segments.NewRetrievePlan(
t.ctx,
t.collection,
t.req.Req.GetSerializedExprPlan(),
t.req.Req.GetMvccTimestamp(),

View File

@ -80,6 +80,7 @@ func (t *QueryTask) Execute() error {
tr := timerecord.NewTimeRecorderWithTrace(t.ctx, "QueryTask")
retrievePlan, err := segments.NewRetrievePlan(
t.ctx,
t.collection,
t.req.Req.GetSerializedExprPlan(),
t.req.Req.GetMvccTimestamp(),

View File

@ -118,7 +118,7 @@ func (t *SearchTask) Execute() error {
req := t.req
t.combinePlaceHolderGroups()
searchReq, err := segments.NewSearchRequest(t.collection, req, t.placeholderGroup)
searchReq, err := segments.NewSearchRequest(t.ctx, t.collection, req, t.placeholderGroup)
if err != nil {
return err
}
@ -182,6 +182,7 @@ func (t *SearchTask) Execute() error {
tr.RecordSpan()
blobs, err := segments.ReduceSearchResultsAndFillData(
t.ctx,
searchReq.Plan(),
results,
int64(len(results)),
@ -199,7 +200,7 @@ func (t *SearchTask) Execute() error {
metrics.ReduceSegments).
Observe(float64(tr.RecordSpan().Milliseconds()))
for i := range t.originNqs {
blob, err := segments.GetSearchResultDataBlob(blobs, i)
blob, err := segments.GetSearchResultDataBlob(t.ctx, blobs, i)
if err != nil {
return err
}