diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 0a4313b129..c6ba6e2437 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -88,6 +88,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { if growing == nil { var err error growing, err = segments.NewSegment( + context.Background(), sd.collection, segmentID, insertData.PartitionID, @@ -107,7 +108,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { } } - err := growing.Insert(insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord) + err := growing.Insert(context.Background(), insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord) if err != nil { log.Error("failed to insert data into growing segment", zap.Int64("segmentID", segmentID), @@ -334,7 +335,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm } log.Info("forwarding L0 delete records...", zap.Int("deletionCount", len(deletedPks))) - err = segment.Delete(deletedPks, deletedTss) + err = segment.Delete(ctx, deletedPks, deletedTss) if err != nil { log.Warn("failed to forward L0 deletions to growing segment", zap.Error(err), diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index bd57fcf70c..f44871c826 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -235,7 +235,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() { ms.EXPECT().Partition().Return(info.GetPartitionID()) ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) - ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil) + ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) @@ -557,8 +557,8 @@ func (s *DelegatorDataSuite) TestLoadSegments() { mockErr := merr.WrapErrServiceInternal("mock") - growing0.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil) - growing1.EXPECT().Delete(mock.Anything, mock.Anything).Return(mockErr) + growing0.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) + growing1.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) s.loader.EXPECT().Load( mock.Anything, @@ -768,7 +768,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { ms.EXPECT().Collection().Return(info.GetCollectionID()) ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) - ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil) + ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool { return pk.EQ(storage.NewInt64PrimaryKey(10)) }) diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index bc2f70e72d..61f8e50bbb 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -94,7 +94,7 @@ func (s *DelegatorSuite) SetupTest() { ms.EXPECT().Collection().Return(info.GetCollectionID()) ms.EXPECT().Indexes().Return(nil) ms.EXPECT().RowNum().Return(info.GetNumOfRows()) - ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil) + ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) return ms }) }, nil) diff --git a/internal/querynodev2/segments/cgo_util.go b/internal/querynodev2/segments/cgo_util.go index fc5d8c4b15..7bcb7be055 100644 --- a/internal/querynodev2/segments/cgo_util.go +++ b/internal/querynodev2/segments/cgo_util.go @@ -27,20 +27,20 @@ package segments import "C" import ( - "fmt" + "context" "unsafe" - "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/cgoconverter" + "github.com/milvus-io/milvus/pkg/util/merr" ) // HandleCStatus deals with the error returned from CGO -func HandleCStatus(status *C.CStatus, extraInfo string, fields ...zap.Field) error { +func HandleCStatus(ctx context.Context, status *C.CStatus, extraInfo string, fields ...zap.Field) error { if status.error_code == 0 { return nil } @@ -52,11 +52,11 @@ func HandleCStatus(status *C.CStatus, extraInfo string, fields ...zap.Field) err errorMsg := C.GoString(status.error_msg) defer C.free(unsafe.Pointer(status.error_msg)) - finalMsg := fmt.Sprintf("%s: %s", errorName, errorMsg) - logMsg := fmt.Sprintf("%s, segcore error: %s\n", extraInfo, finalMsg) - log := log.With().WithOptions(zap.AddCallerSkip(1)) - log.Warn(logMsg, fields...) - return errors.New(finalMsg) + log.Ctx(ctx).With(fields...). + WithOptions(zap.AddCallerSkip(1)) // Add caller stack to show HandleCStatus caller + + log.Warn("CStatus returns err", zap.String("errorName", errorName), zap.String("errorMsg", errorMsg)) + return merr.WrapErrServiceInternal(errorName, errorMsg) } // HandleCProto deal with the result proto returned from CGO @@ -84,14 +84,14 @@ func GetCProtoBlob(cProto *C.CProto) []byte { return blob } -func GetLocalUsedSize(path string) (int64, error) { +func GetLocalUsedSize(ctx context.Context, path string) (int64, error) { var availableSize int64 cSize := (*C.int64_t)(&availableSize) cPath := C.CString(path) defer C.free(unsafe.Pointer(cPath)) status := C.GetLocalUsedSize(cPath, cSize) - err := HandleCStatus(&status, "get local used size failed") + err := HandleCStatus(ctx, &status, "get local used size failed") if err != nil { return 0, err } diff --git a/internal/querynodev2/segments/load_field_data_info.go b/internal/querynodev2/segments/load_field_data_info.go index 44b349d44e..b300b6ecf3 100644 --- a/internal/querynodev2/segments/load_field_data_info.go +++ b/internal/querynodev2/segments/load_field_data_info.go @@ -23,6 +23,7 @@ package segments import "C" import ( + "context" "unsafe" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -32,11 +33,11 @@ type LoadFieldDataInfo struct { cLoadFieldDataInfo C.CLoadFieldDataInfo } -func newLoadFieldDataInfo() (*LoadFieldDataInfo, error) { +func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) { var cLoadFieldDataInfo C.CLoadFieldDataInfo status := C.NewLoadFieldDataInfo(&cLoadFieldDataInfo) - if err := HandleCStatus(&status, "newLoadFieldDataInfo failed"); err != nil { + if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil { return nil, err } return &LoadFieldDataInfo{cLoadFieldDataInfo: cLoadFieldDataInfo}, nil @@ -46,22 +47,22 @@ func deleteFieldDataInfo(info *LoadFieldDataInfo) { C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo) } -func (ld *LoadFieldDataInfo) appendLoadFieldInfo(fieldID int64, rowCount int64) error { +func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error { cFieldID := C.int64_t(fieldID) cRowCount := C.int64_t(rowCount) status := C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount) - return HandleCStatus(&status, "appendLoadFieldInfo failed") + return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed") } -func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(fieldID int64, binlog *datapb.Binlog) error { +func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error { cFieldID := C.int64_t(fieldID) cEntriesNum := C.int64_t(binlog.GetEntriesNum()) cFile := C.CString(binlog.GetLogPath()) defer C.free(unsafe.Pointer(cFile)) status := C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile) - return HandleCStatus(&status, "appendLoadFieldDataPath failed") + return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed") } func (ld *LoadFieldDataInfo) enableMmap(fieldID int64, enabled bool) { diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 35ace62031..7bd6387457 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -25,6 +25,7 @@ package segments import "C" import ( + "context" "unsafe" "github.com/pingcap/log" @@ -45,11 +46,11 @@ type LoadIndexInfo struct { } // newLoadIndexInfo returns a new LoadIndexInfo and error -func newLoadIndexInfo() (*LoadIndexInfo, error) { +func newLoadIndexInfo(ctx context.Context) (*LoadIndexInfo, error) { var cLoadIndexInfo C.CLoadIndexInfo status := C.NewLoadIndexInfo(&cLoadIndexInfo) - if err := HandleCStatus(&status, "NewLoadIndexInfo failed"); err != nil { + if err := HandleCStatus(ctx, &status, "NewLoadIndexInfo failed"); err != nil { return nil, err } return &LoadIndexInfo{cLoadIndexInfo: cLoadIndexInfo}, nil @@ -60,7 +61,7 @@ func deleteLoadIndexInfo(info *LoadIndexInfo) { C.DeleteLoadIndexInfo(info.cLoadIndexInfo) } -func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error { +func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error { fieldID := indexInfo.FieldID indexPaths := indexInfo.IndexFilePaths @@ -70,12 +71,12 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, delete(indexParams, common.MmapEnabledKey) mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() - err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath) + err := li.appendFieldInfo(ctx, collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath) if err != nil { return err } - err = li.appendIndexInfo(indexInfo.IndexID, indexInfo.BuildID, indexInfo.IndexVersion) + err = li.appendIndexInfo(ctx, indexInfo.IndexID, indexInfo.BuildID, indexInfo.IndexVersion) if err != nil { return err } @@ -95,54 +96,54 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, log.Info("load with index params", zap.Any("indexParams", indexParams)) for key, value := range indexParams { - err = li.appendIndexParam(key, value) + err = li.appendIndexParam(ctx, key, value) if err != nil { return err } } - if err := li.appendIndexEngineVersion(indexInfo.GetCurrentIndexVersion()); err != nil { + if err := li.appendIndexEngineVersion(ctx, indexInfo.GetCurrentIndexVersion()); err != nil { return err } - err = li.appendIndexData(indexPaths) + err = li.appendIndexData(ctx, indexPaths) return err } // appendIndexParam append indexParam to index -func (li *LoadIndexInfo) appendIndexParam(indexKey string, indexValue string) error { +func (li *LoadIndexInfo) appendIndexParam(ctx context.Context, indexKey string, indexValue string) error { cIndexKey := C.CString(indexKey) defer C.free(unsafe.Pointer(cIndexKey)) cIndexValue := C.CString(indexValue) defer C.free(unsafe.Pointer(cIndexValue)) status := C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue) - return HandleCStatus(&status, "AppendIndexParam failed") + return HandleCStatus(ctx, &status, "AppendIndexParam failed") } -func (li *LoadIndexInfo) appendIndexInfo(indexID int64, buildID int64, indexVersion int64) error { +func (li *LoadIndexInfo) appendIndexInfo(ctx context.Context, indexID int64, buildID int64, indexVersion int64) error { cIndexID := C.int64_t(indexID) cBuildID := C.int64_t(buildID) cIndexVersion := C.int64_t(indexVersion) status := C.AppendIndexInfo(li.cLoadIndexInfo, cIndexID, cBuildID, cIndexVersion) - return HandleCStatus(&status, "AppendIndexInfo failed") + return HandleCStatus(ctx, &status, "AppendIndexInfo failed") } -func (li *LoadIndexInfo) cleanLocalData() error { +func (li *LoadIndexInfo) cleanLocalData(ctx context.Context) error { status := C.CleanLoadedIndex(li.cLoadIndexInfo) - return HandleCStatus(&status, "failed to clean cached data on disk") + return HandleCStatus(ctx, &status, "failed to clean cached data on disk") } -func (li *LoadIndexInfo) appendIndexFile(filePath string) error { +func (li *LoadIndexInfo) appendIndexFile(ctx context.Context, filePath string) error { cIndexFilePath := C.CString(filePath) defer C.free(unsafe.Pointer(cIndexFilePath)) status := C.AppendIndexFilePath(li.cLoadIndexInfo, cIndexFilePath) - return HandleCStatus(&status, "AppendIndexIFile failed") + return HandleCStatus(ctx, &status, "AppendIndexIFile failed") } // appendFieldInfo appends fieldID & fieldType to index -func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, enableMmap bool, mmapDirPath string) error { +func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, enableMmap bool, mmapDirPath string) error { cColID := C.int64_t(collectionID) cParID := C.int64_t(partitionID) cSegID := C.int64_t(segmentID) @@ -152,25 +153,25 @@ func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, cMmapDirPath := C.CString(mmapDirPath) defer C.free(unsafe.Pointer(cMmapDirPath)) status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cEnableMmap, cMmapDirPath) - return HandleCStatus(&status, "AppendFieldInfo failed") + return HandleCStatus(ctx, &status, "AppendFieldInfo failed") } // appendIndexData appends index path to cLoadIndexInfo and create index -func (li *LoadIndexInfo) appendIndexData(indexKeys []string) error { +func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string) error { for _, indexPath := range indexKeys { - err := li.appendIndexFile(indexPath) + err := li.appendIndexFile(ctx, indexPath) if err != nil { return err } } status := C.AppendIndexV2(li.cLoadIndexInfo) - return HandleCStatus(&status, "AppendIndex failed") + return HandleCStatus(ctx, &status, "AppendIndex failed") } -func (li *LoadIndexInfo) appendIndexEngineVersion(indexEngineVersion int32) error { +func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngineVersion int32) error { cIndexEngineVersion := C.int32_t(indexEngineVersion) status := C.AppendIndexEngineVersionToLoadInfo(li.cLoadIndexInfo, cIndexEngineVersion) - return HandleCStatus(&status, "AppendIndexEngineVersion failed") + return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed") } diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 4e75e71a7d..86c552e7c4 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -1,6 +1,7 @@ package segments import ( + "context" "testing" "github.com/samber/lo" @@ -44,6 +45,7 @@ func (s *ManagerSuite) SetupTest() { for i, id := range s.segmentIDs { schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64) segment, err := NewSegment( + context.Background(), NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection), id, s.partitionIDs[i], diff --git a/internal/querynodev2/segments/mock_data.go b/internal/querynodev2/segments/mock_data.go index fd6318369d..19a199ef75 100644 --- a/internal/querynodev2/segments/mock_data.go +++ b/internal/querynodev2/segments/mock_data.go @@ -1141,7 +1141,7 @@ func genHNSWDSL(schema *schemapb.CollectionSchema, ef int, topK int64, roundDeci >`, nil } -func checkSearchResult(nq int64, plan *SearchPlan, searchResult *SearchResult) error { +func checkSearchResult(ctx context.Context, nq int64, plan *SearchPlan, searchResult *SearchResult) error { searchResults := make([]*SearchResult, 0) searchResults = append(searchResults, searchResult) @@ -1150,13 +1150,13 @@ func checkSearchResult(nq int64, plan *SearchPlan, searchResult *SearchResult) e sliceTopKs := []int64{topK, topK / 2, topK, topK, topK / 2} sInfo := ParseSliceInfo(sliceNQs, sliceTopKs, nq) - res, err := ReduceSearchResultsAndFillData(plan, searchResults, 1, sInfo.SliceNQs, sInfo.SliceTopKs) + res, err := ReduceSearchResultsAndFillData(ctx, plan, searchResults, 1, sInfo.SliceNQs, sInfo.SliceTopKs) if err != nil { return err } for i := 0; i < len(sInfo.SliceNQs); i++ { - blob, err := GetSearchResultDataBlob(res, i) + blob, err := GetSearchResultDataBlob(ctx, res, i) if err != nil { return err } @@ -1199,7 +1199,7 @@ func genSearchPlanAndRequests(collection *Collection, segments []int64, indexTyp FromShardLeader: true, Scope: querypb.DataScope_Historical, } - return NewSearchRequest(collection, queryReq, queryReq.Req.GetPlaceholderGroup()) + return NewSearchRequest(context.Background(), collection, queryReq, queryReq.Req.GetPlaceholderGroup()) } func genInsertMsg(collection *Collection, partitionID, segment int64, numRows int) (*msgstream.InsertMsg, error) { @@ -1301,7 +1301,7 @@ func genSimpleRetrievePlan(collection *Collection) (*RetrievePlan, error) { return nil, err } - plan, err2 := NewRetrievePlan(collection, planBytes, timestamp, 100) + plan, err2 := NewRetrievePlan(context.Background(), collection, planBytes, timestamp, 100) return plan, err2 } diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 1588bb32d1..5a5bf0769a 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -115,13 +115,13 @@ func (_c *MockSegment_Collection_Call) RunAndReturn(run func() int64) *MockSegme return _c } -// Delete provides a mock function with given fields: primaryKeys, timestamps -func (_m *MockSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []uint64) error { - ret := _m.Called(primaryKeys, timestamps) +// Delete provides a mock function with given fields: ctx, primaryKeys, timestamps +func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64) error { + ret := _m.Called(ctx, primaryKeys, timestamps) var r0 error - if rf, ok := ret.Get(0).(func([]storage.PrimaryKey, []uint64) error); ok { - r0 = rf(primaryKeys, timestamps) + if rf, ok := ret.Get(0).(func(context.Context, []storage.PrimaryKey, []uint64) error); ok { + r0 = rf(ctx, primaryKeys, timestamps) } else { r0 = ret.Error(0) } @@ -135,15 +135,16 @@ type MockSegment_Delete_Call struct { } // Delete is a helper method to define mock.On call +// - ctx context.Context // - primaryKeys []storage.PrimaryKey // - timestamps []uint64 -func (_e *MockSegment_Expecter) Delete(primaryKeys interface{}, timestamps interface{}) *MockSegment_Delete_Call { - return &MockSegment_Delete_Call{Call: _e.mock.On("Delete", primaryKeys, timestamps)} +func (_e *MockSegment_Expecter) Delete(ctx interface{}, primaryKeys interface{}, timestamps interface{}) *MockSegment_Delete_Call { + return &MockSegment_Delete_Call{Call: _e.mock.On("Delete", ctx, primaryKeys, timestamps)} } -func (_c *MockSegment_Delete_Call) Run(run func(primaryKeys []storage.PrimaryKey, timestamps []uint64)) *MockSegment_Delete_Call { +func (_c *MockSegment_Delete_Call) Run(run func(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64)) *MockSegment_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]storage.PrimaryKey), args[1].([]uint64)) + run(args[0].(context.Context), args[1].([]storage.PrimaryKey), args[2].([]uint64)) }) return _c } @@ -153,7 +154,7 @@ func (_c *MockSegment_Delete_Call) Return(_a0 error) *MockSegment_Delete_Call { return _c } -func (_c *MockSegment_Delete_Call) RunAndReturn(run func([]storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call { +func (_c *MockSegment_Delete_Call) RunAndReturn(run func(context.Context, []storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call { _c.Call.Return(run) return _c } @@ -370,13 +371,13 @@ func (_c *MockSegment_Indexes_Call) RunAndReturn(run func() []*IndexedFieldInfo) return _c } -// Insert provides a mock function with given fields: rowIDs, timestamps, record -func (_m *MockSegment) Insert(rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord) error { - ret := _m.Called(rowIDs, timestamps, record) +// Insert provides a mock function with given fields: ctx, rowIDs, timestamps, record +func (_m *MockSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord) error { + ret := _m.Called(ctx, rowIDs, timestamps, record) var r0 error - if rf, ok := ret.Get(0).(func([]int64, []uint64, *segcorepb.InsertRecord) error); ok { - r0 = rf(rowIDs, timestamps, record) + if rf, ok := ret.Get(0).(func(context.Context, []int64, []uint64, *segcorepb.InsertRecord) error); ok { + r0 = rf(ctx, rowIDs, timestamps, record) } else { r0 = ret.Error(0) } @@ -390,16 +391,17 @@ type MockSegment_Insert_Call struct { } // Insert is a helper method to define mock.On call +// - ctx context.Context // - rowIDs []int64 // - timestamps []uint64 // - record *segcorepb.InsertRecord -func (_e *MockSegment_Expecter) Insert(rowIDs interface{}, timestamps interface{}, record interface{}) *MockSegment_Insert_Call { - return &MockSegment_Insert_Call{Call: _e.mock.On("Insert", rowIDs, timestamps, record)} +func (_e *MockSegment_Expecter) Insert(ctx interface{}, rowIDs interface{}, timestamps interface{}, record interface{}) *MockSegment_Insert_Call { + return &MockSegment_Insert_Call{Call: _e.mock.On("Insert", ctx, rowIDs, timestamps, record)} } -func (_c *MockSegment_Insert_Call) Run(run func(rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord)) *MockSegment_Insert_Call { +func (_c *MockSegment_Insert_Call) Run(run func(ctx context.Context, rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord)) *MockSegment_Insert_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]int64), args[1].([]uint64), args[2].(*segcorepb.InsertRecord)) + run(args[0].(context.Context), args[1].([]int64), args[2].([]uint64), args[3].(*segcorepb.InsertRecord)) }) return _c } @@ -409,7 +411,7 @@ func (_c *MockSegment_Insert_Call) Return(_a0 error) *MockSegment_Insert_Call { return _c } -func (_c *MockSegment_Insert_Call) RunAndReturn(run func([]int64, []uint64, *segcorepb.InsertRecord) error) *MockSegment_Insert_Call { +func (_c *MockSegment_Insert_Call) RunAndReturn(run func(context.Context, []int64, []uint64, *segcorepb.InsertRecord) error) *MockSegment_Insert_Call { _c.Call.Return(run) return _c } @@ -537,13 +539,13 @@ func (_c *MockSegment_Level_Call) RunAndReturn(run func() datapb.SegmentLevel) * return _c } -// LoadDeltaData provides a mock function with given fields: deltaData -func (_m *MockSegment) LoadDeltaData(deltaData *storage.DeleteData) error { - ret := _m.Called(deltaData) +// LoadDeltaData provides a mock function with given fields: ctx, deltaData +func (_m *MockSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error { + ret := _m.Called(ctx, deltaData) var r0 error - if rf, ok := ret.Get(0).(func(*storage.DeleteData) error); ok { - r0 = rf(deltaData) + if rf, ok := ret.Get(0).(func(context.Context, *storage.DeleteData) error); ok { + r0 = rf(ctx, deltaData) } else { r0 = ret.Error(0) } @@ -557,14 +559,15 @@ type MockSegment_LoadDeltaData_Call struct { } // LoadDeltaData is a helper method to define mock.On call +// - ctx context.Context // - deltaData *storage.DeleteData -func (_e *MockSegment_Expecter) LoadDeltaData(deltaData interface{}) *MockSegment_LoadDeltaData_Call { - return &MockSegment_LoadDeltaData_Call{Call: _e.mock.On("LoadDeltaData", deltaData)} +func (_e *MockSegment_Expecter) LoadDeltaData(ctx interface{}, deltaData interface{}) *MockSegment_LoadDeltaData_Call { + return &MockSegment_LoadDeltaData_Call{Call: _e.mock.On("LoadDeltaData", ctx, deltaData)} } -func (_c *MockSegment_LoadDeltaData_Call) Run(run func(deltaData *storage.DeleteData)) *MockSegment_LoadDeltaData_Call { +func (_c *MockSegment_LoadDeltaData_Call) Run(run func(ctx context.Context, deltaData *storage.DeleteData)) *MockSegment_LoadDeltaData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*storage.DeleteData)) + run(args[0].(context.Context), args[1].(*storage.DeleteData)) }) return _c } @@ -574,7 +577,7 @@ func (_c *MockSegment_LoadDeltaData_Call) Return(_a0 error) *MockSegment_LoadDel return _c } -func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(*storage.DeleteData) error) *MockSegment_LoadDeltaData_Call { +func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(context.Context, *storage.DeleteData) error) *MockSegment_LoadDeltaData_Call { _c.Call.Return(run) return _c } diff --git a/internal/querynodev2/segments/mock_segment_manager.go b/internal/querynodev2/segments/mock_segment_manager.go index 1d86c3a5d5..17ac69b5f5 100644 --- a/internal/querynodev2/segments/mock_segment_manager.go +++ b/internal/querynodev2/segments/mock_segment_manager.go @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" querypb "github.com/milvus-io/milvus/internal/proto/querypb" - - storage "github.com/milvus-io/milvus/internal/storage" ) // MockSegmentManager is an autogenerated mock type for the SegmentManager type @@ -378,61 +376,6 @@ func (_c *MockSegmentManager_GetGrowing_Call) RunAndReturn(run func(int64) Segme return _c } -// GetL0DeleteRecords provides a mock function with given fields: -func (_m *MockSegmentManager) GetL0DeleteRecords() ([]storage.PrimaryKey, []uint64) { - ret := _m.Called() - - var r0 []storage.PrimaryKey - var r1 []uint64 - if rf, ok := ret.Get(0).(func() ([]storage.PrimaryKey, []uint64)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() []storage.PrimaryKey); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]storage.PrimaryKey) - } - } - - if rf, ok := ret.Get(1).(func() []uint64); ok { - r1 = rf() - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).([]uint64) - } - } - - return r0, r1 -} - -// MockSegmentManager_GetL0DeleteRecords_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetL0DeleteRecords' -type MockSegmentManager_GetL0DeleteRecords_Call struct { - *mock.Call -} - -// GetL0DeleteRecords is a helper method to define mock.On call -func (_e *MockSegmentManager_Expecter) GetL0DeleteRecords() *MockSegmentManager_GetL0DeleteRecords_Call { - return &MockSegmentManager_GetL0DeleteRecords_Call{Call: _e.mock.On("GetL0DeleteRecords")} -} - -func (_c *MockSegmentManager_GetL0DeleteRecords_Call) Run(run func()) *MockSegmentManager_GetL0DeleteRecords_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockSegmentManager_GetL0DeleteRecords_Call) Return(_a0 []storage.PrimaryKey, _a1 []uint64) *MockSegmentManager_GetL0DeleteRecords_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockSegmentManager_GetL0DeleteRecords_Call) RunAndReturn(run func() ([]storage.PrimaryKey, []uint64)) *MockSegmentManager_GetL0DeleteRecords_Call { - _c.Call.Return(run) - return _c -} - // GetSealed provides a mock function with given fields: segmentID func (_m *MockSegmentManager) GetSealed(segmentID int64) Segment { ret := _m.Called(segmentID) diff --git a/internal/querynodev2/segments/plan.go b/internal/querynodev2/segments/plan.go index a9f07bfc78..edc01d27e0 100644 --- a/internal/querynodev2/segments/plan.go +++ b/internal/querynodev2/segments/plan.go @@ -26,6 +26,7 @@ package segments import "C" import ( + "context" "fmt" "unsafe" @@ -41,14 +42,14 @@ type SearchPlan struct { cSearchPlan C.CSearchPlan } -func createSearchPlanByExpr(col *Collection, expr []byte, metricType string) (*SearchPlan, error) { +func createSearchPlanByExpr(ctx context.Context, col *Collection, expr []byte, metricType string) (*SearchPlan, error) { if col.collectionPtr == nil { return nil, errors.New("nil collection ptr, collectionID = " + fmt.Sprintln(col.id)) } var cPlan C.CSearchPlan status := C.CreateSearchPlanByExpr(col.collectionPtr, unsafe.Pointer(&expr[0]), (C.int64_t)(len(expr)), &cPlan) - err1 := HandleCStatus(&status, "Create Plan by expr failed") + err1 := HandleCStatus(ctx, &status, "Create Plan by expr failed") if err1 != nil { return nil, err1 } @@ -91,12 +92,12 @@ type SearchRequest struct { searchFieldID UniqueID } -func NewSearchRequest(collection *Collection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) { +func NewSearchRequest(ctx context.Context, collection *Collection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) { var err error var plan *SearchPlan metricType := req.GetReq().GetMetricType() expr := req.Req.SerializedExprPlan - plan, err = createSearchPlanByExpr(collection, expr, metricType) + plan, err = createSearchPlanByExpr(ctx, collection, expr, metricType) if err != nil { return nil, err } @@ -111,14 +112,14 @@ func NewSearchRequest(collection *Collection, req *querypb.SearchRequest, placeh var cPlaceholderGroup C.CPlaceholderGroup status := C.ParsePlaceholderGroup(plan.cSearchPlan, blobPtr, blobSize, &cPlaceholderGroup) - if err := HandleCStatus(&status, "parser searchRequest failed"); err != nil { + if err := HandleCStatus(ctx, &status, "parser searchRequest failed"); err != nil { plan.delete() return nil, err } var fieldID C.int64_t status = C.GetFieldID(plan.cSearchPlan, &fieldID) - if err = HandleCStatus(&status, "get fieldID from plan failed"); err != nil { + if err = HandleCStatus(ctx, &status, "get fieldID from plan failed"); err != nil { plan.delete() return nil, err } @@ -149,7 +150,7 @@ func (req *SearchRequest) Delete() { C.DeletePlaceholderGroup(req.cPlaceholderGroup) } -func parseSearchRequest(plan *SearchPlan, searchRequestBlob []byte) (*SearchRequest, error) { +func parseSearchRequest(ctx context.Context, plan *SearchPlan, searchRequestBlob []byte) (*SearchRequest, error) { if len(searchRequestBlob) == 0 { return nil, fmt.Errorf("empty search request") } @@ -158,7 +159,7 @@ func parseSearchRequest(plan *SearchPlan, searchRequestBlob []byte) (*SearchRequ var cPlaceholderGroup C.CPlaceholderGroup status := C.ParsePlaceholderGroup(plan.cSearchPlan, blobPtr, blobSize, &cPlaceholderGroup) - if err := HandleCStatus(&status, "parser searchRequest failed"); err != nil { + if err := HandleCStatus(ctx, &status, "parser searchRequest failed"); err != nil { return nil, err } @@ -173,7 +174,7 @@ type RetrievePlan struct { msgID UniqueID // only used to debug. } -func NewRetrievePlan(col *Collection, expr []byte, timestamp Timestamp, msgID UniqueID) (*RetrievePlan, error) { +func NewRetrievePlan(ctx context.Context, col *Collection, expr []byte, timestamp Timestamp, msgID UniqueID) (*RetrievePlan, error) { col.mu.RLock() defer col.mu.RUnlock() @@ -184,7 +185,7 @@ func NewRetrievePlan(col *Collection, expr []byte, timestamp Timestamp, msgID Un var cPlan C.CRetrievePlan status := C.CreateRetrievePlanByExpr(col.collectionPtr, unsafe.Pointer(&expr[0]), (C.int64_t)(len(expr)), &cPlan) - err := HandleCStatus(&status, "Create retrieve plan by expr failed") + err := HandleCStatus(ctx, &status, "Create retrieve plan by expr failed") if err != nil { return nil, err } diff --git a/internal/querynodev2/segments/plan_test.go b/internal/querynodev2/segments/plan_test.go index abd41f3636..6fd7772dc1 100644 --- a/internal/querynodev2/segments/plan_test.go +++ b/internal/querynodev2/segments/plan_test.go @@ -17,6 +17,7 @@ package segments import ( + "context" "testing" "github.com/golang/protobuf/proto" @@ -57,7 +58,7 @@ func (suite *PlanSuite) TestPlanCreateByExpr() { expr, err := proto.Marshal(planNode) suite.NoError(err) - _, err = createSearchPlanByExpr(suite.collection, expr, "") + _, err = createSearchPlanByExpr(context.Background(), suite.collection, expr, "") suite.Error(err) } @@ -66,13 +67,13 @@ func (suite *PlanSuite) TestPlanFail() { id: -1, } - _, err := createSearchPlanByExpr(collection, nil, "") + _, err := createSearchPlanByExpr(context.Background(), collection, nil, "") suite.Error(err) } func (suite *PlanSuite) TestQueryPlanCollectionReleased() { collection := &Collection{id: suite.collectionID} - _, err := NewRetrievePlan(collection, nil, 0, 0) + _, err := NewRetrievePlan(context.Background(), collection, nil, 0, 0) suite.Error(err) } diff --git a/internal/querynodev2/segments/reduce.go b/internal/querynodev2/segments/reduce.go index 7e8ec94441..f06e7c67be 100644 --- a/internal/querynodev2/segments/reduce.go +++ b/internal/querynodev2/segments/reduce.go @@ -25,6 +25,7 @@ package segments import "C" import ( + "context" "fmt" ) @@ -70,7 +71,7 @@ func ParseSliceInfo(originNQs []int64, originTopKs []int64, nqPerSlice int64) *S return sInfo } -func ReduceSearchResultsAndFillData(plan *SearchPlan, searchResults []*SearchResult, +func ReduceSearchResultsAndFillData(ctx context.Context, plan *SearchPlan, searchResults []*SearchResult, numSegments int64, sliceNQs []int64, sliceTopKs []int64, ) (searchResultDataBlobs, error) { if plan.cSearchPlan == nil { @@ -100,16 +101,16 @@ func ReduceSearchResultsAndFillData(plan *SearchPlan, searchResults []*SearchRes var cSearchResultDataBlobs searchResultDataBlobs status := C.ReduceSearchResultsAndFillData(&cSearchResultDataBlobs, plan.cSearchPlan, cSearchResultPtr, cNumSegments, cSliceNQSPtr, cSliceTopKSPtr, cNumSlices) - if err := HandleCStatus(&status, "ReduceSearchResultsAndFillData failed"); err != nil { + if err := HandleCStatus(ctx, &status, "ReduceSearchResultsAndFillData failed"); err != nil { return nil, err } return cSearchResultDataBlobs, nil } -func GetSearchResultDataBlob(cSearchResultDataBlobs searchResultDataBlobs, blobIndex int) ([]byte, error) { +func GetSearchResultDataBlob(ctx context.Context, cSearchResultDataBlobs searchResultDataBlobs, blobIndex int) ([]byte, error) { var blob C.CProto status := C.GetSearchResultDataBlob(&blob, cSearchResultDataBlobs, C.int32_t(blobIndex)) - if err := HandleCStatus(&status, "marshal failed"); err != nil { + if err := HandleCStatus(ctx, &status, "marshal failed"); err != nil { return nil, err } return GetCProtoBlob(&blob), nil diff --git a/internal/querynodev2/segments/reduce_test.go b/internal/querynodev2/segments/reduce_test.go index e0ecae846d..2381cdd54e 100644 --- a/internal/querynodev2/segments/reduce_test.go +++ b/internal/querynodev2/segments/reduce_test.go @@ -72,7 +72,8 @@ func (suite *ReduceSuite) SetupTest() { GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, ) - suite.segment, err = NewSegment(suite.collection, + suite.segment, err = NewSegment(ctx, + suite.collection, suite.segmentID, suite.partitionID, suite.collectionID, @@ -95,7 +96,7 @@ func (suite *ReduceSuite) SetupTest() { ) suite.Require().NoError(err) for _, binlog := range binlogs { - err = suite.segment.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false) + err = suite.segment.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false) suite.Require().NoError(err) } } @@ -164,29 +165,29 @@ func (suite *ReduceSuite) TestReduceAllFunc() { proto.UnmarshalText(planStr, &planpb) serializedPlan, err := proto.Marshal(&planpb) suite.NoError(err) - plan, err := createSearchPlanByExpr(suite.collection, serializedPlan, "") + plan, err := createSearchPlanByExpr(context.Background(), suite.collection, serializedPlan, "") suite.NoError(err) - searchReq, err := parseSearchRequest(plan, placeGroupByte) + searchReq, err := parseSearchRequest(context.Background(), plan, placeGroupByte) suite.NoError(err) defer searchReq.Delete() searchResult, err := suite.segment.Search(context.Background(), searchReq) suite.NoError(err) - err = checkSearchResult(nq, plan, searchResult) + err = checkSearchResult(context.Background(), nq, plan, searchResult) suite.NoError(err) } func (suite *ReduceSuite) TestReduceInvalid() { plan := &SearchPlan{} - _, err := ReduceSearchResultsAndFillData(plan, nil, 1, nil, nil) + _, err := ReduceSearchResultsAndFillData(context.Background(), plan, nil, 1, nil, nil) suite.Error(err) searchReq, err := genSearchPlanAndRequests(suite.collection, []int64{suite.segmentID}, IndexHNSW, 10) suite.NoError(err) searchResults := make([]*SearchResult, 0) searchResults = append(searchResults, nil) - _, err = ReduceSearchResultsAndFillData(searchReq.plan, searchResults, 1, []int64{10}, []int64{10}) + _, err = ReduceSearchResultsAndFillData(context.Background(), searchReq.plan, searchResults, 1, []int64{10}, []int64{10}) suite.Error(err) } diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 2c48f9ec04..2d459378cd 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -83,7 +83,8 @@ func (suite *RetrieveSuite) SetupTest() { ) suite.collection = suite.manager.Collection.Get(suite.collectionID) - suite.sealed, err = NewSegment(suite.collection, + suite.sealed, err = NewSegment(ctx, + suite.collection, suite.segmentID, suite.partitionID, suite.collectionID, @@ -106,11 +107,12 @@ func (suite *RetrieveSuite) SetupTest() { ) suite.Require().NoError(err) for _, binlog := range binlogs { - err = suite.sealed.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false) + err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false) suite.Require().NoError(err) } - suite.growing, err = NewSegment(suite.collection, + suite.growing, err = NewSegment(ctx, + suite.collection, suite.segmentID+1, suite.partitionID, suite.collectionID, @@ -127,7 +129,7 @@ func (suite *RetrieveSuite) SetupTest() { suite.Require().NoError(err) insertRecord, err := storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg) suite.Require().NoError(err) - err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + err = suite.growing.Insert(ctx, insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) suite.Require().NoError(err) suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed) diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index 17116de30a..db657a275b 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -74,7 +74,8 @@ func (suite *SearchSuite) SetupTest() { ) suite.collection = suite.manager.Collection.Get(suite.collectionID) - suite.sealed, err = NewSegment(suite.collection, + suite.sealed, err = NewSegment(ctx, + suite.collection, suite.segmentID, suite.partitionID, suite.collectionID, @@ -97,11 +98,12 @@ func (suite *SearchSuite) SetupTest() { ) suite.Require().NoError(err) for _, binlog := range binlogs { - err = suite.sealed.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false) + err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false) suite.Require().NoError(err) } - suite.growing, err = NewSegment(suite.collection, + suite.growing, err = NewSegment(ctx, + suite.collection, suite.segmentID+1, suite.partitionID, suite.collectionID, @@ -118,7 +120,7 @@ func (suite *SearchSuite) SetupTest() { suite.Require().NoError(err) insertRecord, err := storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg) suite.Require().NoError(err) - suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + suite.growing.Insert(ctx, insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed) suite.manager.Segment.Put(SegmentTypeGrowing, suite.growing) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 5b312e67c4..2d4d357fd3 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -157,7 +157,8 @@ type LocalSegment struct { fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] } -func NewSegment(collection *Collection, +func NewSegment(ctx context.Context, + collection *Collection, segmentID int64, partitionID int64, collectionID int64, @@ -168,6 +169,7 @@ func NewSegment(collection *Collection, deltaPosition *msgpb.MsgPosition, level datapb.SegmentLevel, ) (Segment, error) { + log := log.Ctx(ctx) /* CStatus NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type, CSegmentInterface* newSegment); @@ -188,7 +190,7 @@ func NewSegment(collection *Collection, var newPtr C.CSegmentInterface _, err := GetDynamicPool().Submit(func() (any, error) { status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(segmentID), &newPtr) - err := HandleCStatus(&status, "NewSegmentFailed", + err := HandleCStatus(ctx, &status, "NewSegmentFailed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), @@ -387,7 +389,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) return nil, nil }).Await() - if err := HandleCStatus(&status, "Search failed", + if err := HandleCStatus(ctx, &status, "Search failed", zap.Int64("collectionID", s.Collection()), zap.Int64("segmentID", s.ID()), zap.String("segmentType", s.typ.String())); err != nil { @@ -442,7 +444,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco return nil, nil }).Await() - if err := HandleCStatus(&status, "Retrieve failed", + if err := HandleCStatus(ctx, &status, "Retrieve failed", zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), @@ -479,7 +481,7 @@ func (s *LocalSegment) GetFieldDataPath(index *IndexedFieldInfo, offset int64) ( } // -------------------------------------------------------------------------------------- interfaces for growing segment -func (s *LocalSegment) preInsert(numOfRecords int) (int64, error) { +func (s *LocalSegment) preInsert(ctx context.Context, numOfRecords int) (int64, error) { /* long int PreInsert(CSegmentInterface c_segment, long int size); @@ -492,13 +494,13 @@ func (s *LocalSegment) preInsert(numOfRecords int) (int64, error) { status = C.PreInsert(s.ptr, C.int64_t(int64(numOfRecords)), cOffset) return nil, nil }).Await() - if err := HandleCStatus(&status, "PreInsert failed"); err != nil { + if err := HandleCStatus(ctx, &status, "PreInsert failed"); err != nil { return 0, err } return offset, nil } -func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error { +func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error { if s.Type() != SegmentTypeGrowing { return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.typ.String()) } @@ -510,7 +512,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } - offset, err := s.preInsert(len(rowIDs)) + offset, err := s.preInsert(ctx, len(rowIDs)) if err != nil { return err } @@ -539,7 +541,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r ) return nil, nil }).Await() - if err := HandleCStatus(&status, "Insert failed"); err != nil { + if err := HandleCStatus(ctx, &status, "Insert failed"); err != nil { return err } @@ -556,7 +558,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r return nil } -func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error { +func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error { /* CStatus Delete(CSegmentInterface c_segment, @@ -624,7 +626,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ return nil, nil }).Await() - if err := HandleCStatus(&status, "Delete failed"); err != nil { + if err := HandleCStatus(ctx, &status, "Delete failed"); err != nil { return err } @@ -635,7 +637,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ } // -------------------------------------------------------------------------------------- interfaces for sealed segment -func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.FieldBinlog) error { +func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error { s.ptrLock.RLock() defer s.ptrLock.RUnlock() @@ -643,13 +645,13 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } - log := log.With( + log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), ) - loadFieldDataInfo, err := newLoadFieldDataInfo() + loadFieldDataInfo, err := newLoadFieldDataInfo(ctx) defer deleteFieldDataInfo(loadFieldDataInfo) if err != nil { return err @@ -657,13 +659,13 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field for _, field := range fields { fieldID := field.FieldID - err = loadFieldDataInfo.appendLoadFieldInfo(fieldID, rowCount) + err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount) if err != nil { return err } for _, binlog := range field.Binlogs { - err = loadFieldDataInfo.appendLoadFieldDataPath(fieldID, binlog) + err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog) if err != nil { return err } @@ -677,7 +679,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil }).Await() - if err := HandleCStatus(&status, "LoadMultiFieldData failed", + if err := HandleCStatus(ctx, &status, "LoadMultiFieldData failed", zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID())); err != nil { @@ -692,7 +694,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field return nil } -func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datapb.FieldBinlog, mmapEnabled bool) error { +func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog, mmapEnabled bool) error { s.ptrLock.RLock() defer s.ptrLock.RUnlock() @@ -700,7 +702,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } - log := log.With( + log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), @@ -709,19 +711,19 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap ) log.Info("start loading field data for field") - loadFieldDataInfo, err := newLoadFieldDataInfo() + loadFieldDataInfo, err := newLoadFieldDataInfo(ctx) defer deleteFieldDataInfo(loadFieldDataInfo) if err != nil { return err } - err = loadFieldDataInfo.appendLoadFieldInfo(fieldID, rowCount) + err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount) if err != nil { return err } for _, binlog := range field.Binlogs { - err = loadFieldDataInfo.appendLoadFieldDataPath(fieldID, binlog) + err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog) if err != nil { return err } @@ -735,7 +737,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil }).Await() - if err := HandleCStatus(&status, "LoadFieldData failed", + if err := HandleCStatus(ctx, &status, "LoadFieldData failed", zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), @@ -749,7 +751,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap return nil } -func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBinlog) error { +func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error { s.ptrLock.RLock() defer s.ptrLock.RUnlock() @@ -757,14 +759,14 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } - log := log.With( + log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), zap.Int64("row count", rowCount), ) - loadFieldDataInfo, err := newLoadFieldDataInfo() + loadFieldDataInfo, err := newLoadFieldDataInfo(ctx) defer deleteFieldDataInfo(loadFieldDataInfo) if err != nil { return err @@ -772,13 +774,13 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi for _, field := range fields { fieldID := field.FieldID - err = loadFieldDataInfo.appendLoadFieldInfo(fieldID, rowCount) + err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount) if err != nil { return err } for _, binlog := range field.Binlogs { - err = loadFieldDataInfo.appendLoadFieldDataPath(fieldID, binlog) + err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog) if err != nil { return err } @@ -790,7 +792,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) return nil, nil }).Await() - if err := HandleCStatus(&status, "AddFieldDataInfo failed", + if err := HandleCStatus(ctx, &status, "AddFieldDataInfo failed", zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID())); err != nil { @@ -801,7 +803,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi return nil } -func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error { +func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error { pks, tss := deltaData.Pks, deltaData.Tss rowNum := deltaData.RowCount @@ -812,7 +814,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error { return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } - log := log.With( + log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), @@ -866,7 +868,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error { return nil, nil }).Await() - if err := HandleCStatus(&status, "LoadDeletedRecord failed", + if err := HandleCStatus(ctx, &status, "LoadDeletedRecord failed", zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID())); err != nil { @@ -882,16 +884,16 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error { return nil } -func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error { - loadIndexInfo, err := newLoadIndexInfo() +func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error { + loadIndexInfo, err := newLoadIndexInfo(ctx) defer deleteLoadIndexInfo(loadIndexInfo) if err != nil { return err } - err = loadIndexInfo.appendLoadIndexInfo(indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType) + err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType) if err != nil { - if loadIndexInfo.cleanLocalData() != nil { + if loadIndexInfo.cleanLocalData(ctx) != nil { log.Warn("failed to clean cached data on disk after append index failed", zap.Int64("buildID", indexInfo.BuildID), zap.Int64("index version", indexInfo.IndexVersion)) @@ -903,10 +905,10 @@ func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType sc return errors.New(errMsg) } - return s.LoadIndexInfo(indexInfo, loadIndexInfo) + return s.LoadIndexInfo(ctx, indexInfo, loadIndexInfo) } -func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { +func (s *LocalSegment) LoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { log := log.With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), @@ -926,7 +928,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo return nil, nil }).Await() - if err := HandleCStatus(&status, "UpdateSealedSegmentIndex failed", + if err := HandleCStatus(ctx, &status, "UpdateSealedSegmentIndex failed", zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), @@ -938,7 +940,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo return nil } -func (s *LocalSegment) UpdateFieldRawDataSize(numRows int64, fieldBinlog *datapb.FieldBinlog) error { +func (s *LocalSegment) UpdateFieldRawDataSize(ctx context.Context, numRows int64, fieldBinlog *datapb.FieldBinlog) error { var status C.CStatus fieldID := fieldBinlog.FieldID fieldDataSize := int64(0) @@ -950,7 +952,7 @@ func (s *LocalSegment) UpdateFieldRawDataSize(numRows int64, fieldBinlog *datapb return nil, nil }).Await() - if err := HandleCStatus(&status, "updateFieldRawDataSize failed"); err != nil { + if err := HandleCStatus(ctx, &status, "updateFieldRawDataSize failed"); err != nil { return err } diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index aa047ab1e8..98d0464ee6 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -54,9 +54,9 @@ type Segment interface { HasRawData(fieldID int64) bool // Modification related - Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error - Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error - LoadDeltaData(deltaData *storage.DeleteData) error + Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error + Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error + LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error LastDeltaTimestamp() uint64 Release() diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index 285939db79..e0a6b7b90f 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -131,15 +131,15 @@ func (s *L0Segment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorep return nil, nil } -func (s *L0Segment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error { +func (s *L0Segment) Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error { return merr.WrapErrIoFailedReason("insert not supported for L0 segment") } -func (s *L0Segment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error { +func (s *L0Segment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error { return merr.WrapErrIoFailedReason("delete not supported for L0 segment") } -func (s *L0Segment) LoadDeltaData(deltaData *storage.DeleteData) error { +func (s *L0Segment) LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error { s.dataGuard.Lock() defer s.dataGuard.Unlock() diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 9d5b30fe79..693d736b59 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -213,6 +213,7 @@ func (loader *segmentLoader) Load(ctx context.Context, } segment, err := NewSegment( + ctx, collection, segmentID, partitionID, @@ -366,7 +367,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer memoryUsage := hardware.GetUsedMemoryCount() totalMemory := hardware.GetMemoryCount() - diskUsage, err := GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) + diskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) if err != nil { return resource, 0, errors.Wrap(err, "get local used size failed") } @@ -583,7 +584,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, } if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) { log.Info("field index doesn't include raw data, load binlog...", zap.Int64("fieldID", fieldID), zap.String("index", info.IndexInfo.GetIndexName())) - if err = segment.LoadFieldData(fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil { + if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil { log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err)) return err } @@ -592,7 +593,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, if err := loader.loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo.GetNumOfRows()); err != nil { return err } - if err := segment.AddFieldDataInfo(loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil { + if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil { return err } // https://github.com/milvus-io/milvus/23654 @@ -601,7 +602,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, return err } } else { - if err := segment.LoadMultiFieldData(loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil { + if err := segment.LoadMultiFieldData(ctx, loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil { return err } } @@ -651,7 +652,8 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen fieldBinLog := field fieldID := field.FieldID runningGroup.Go(func() error { - return segment.LoadFieldData(fieldID, + return segment.LoadFieldData(ctx, + fieldID, rowCount, fieldBinLog, common.IsFieldMmapEnabled(collection.Schema(), fieldID), @@ -701,7 +703,7 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, return err } if typeutil.IsVariableDataType(field.GetDataType()) { - err = segment.UpdateFieldRawDataSize(numRows, fieldInfo.FieldBinlog) + err = segment.UpdateFieldRawDataSize(ctx, numRows, fieldInfo.FieldBinlog) if err != nil { return err } @@ -731,7 +733,7 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load field index") } - return segment.LoadIndex(indexInfo, fieldType) + return segment.LoadIndex(ctx, indexInfo, fieldType) } func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, @@ -785,7 +787,7 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6 } func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error { - log := log.With( + log := log.Ctx(ctx).With( zap.Int64("segmentID", segment.ID()), ) dCodec := storage.DeleteCodec{} @@ -817,7 +819,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, return err } - err = segment.LoadDeltaData(deltaData) + err = segment.LoadDeltaData(ctx, deltaData) if err != nil { return err } @@ -936,7 +938,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) + localDiskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) if err != nil { return 0, 0, errors.Wrap(err, "get local used size failed") } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 5abe64d3a5..372b1d2de7 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -61,7 +61,8 @@ func (suite *SegmentSuite) SetupTest() { ) suite.collection = suite.manager.Collection.Get(suite.collectionID) - suite.sealed, err = NewSegment(suite.collection, + suite.sealed, err = NewSegment(ctx, + suite.collection, suite.segmentID, suite.partitionID, suite.collectionID, @@ -84,11 +85,12 @@ func (suite *SegmentSuite) SetupTest() { ) suite.Require().NoError(err) for _, binlog := range binlogs { - err = suite.sealed.(*LocalSegment).LoadFieldData(binlog.FieldID, int64(msgLength), binlog, false) + err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false) suite.Require().NoError(err) } - suite.growing, err = NewSegment(suite.collection, + suite.growing, err = NewSegment(ctx, + suite.collection, suite.segmentID+1, suite.partitionID, suite.collectionID, @@ -105,7 +107,7 @@ func (suite *SegmentSuite) SetupTest() { suite.Require().NoError(err) insertRecord, err := storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg) suite.Require().NoError(err) - err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + err = suite.growing.Insert(ctx, insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) suite.Require().NoError(err) suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed) @@ -121,12 +123,14 @@ func (suite *SegmentSuite) TearDownTest() { } func (suite *SegmentSuite) TestDelete() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() pks, err := storage.GenInt64PrimaryKeys(0, 1) suite.NoError(err) // Test for sealed rowNum := suite.sealed.RowNum() - err = suite.sealed.Delete(pks, []uint64{1000, 1000}) + err = suite.sealed.Delete(ctx, pks, []uint64{1000, 1000}) suite.NoError(err) suite.Equal(rowNum-int64(len(pks)), suite.sealed.RowNum()) @@ -134,7 +138,7 @@ func (suite *SegmentSuite) TestDelete() { // Test for growing rowNum = suite.growing.RowNum() - err = suite.growing.Delete(pks, []uint64{1000, 1000}) + err = suite.growing.Delete(ctx, pks, []uint64{1000, 1000}) suite.NoError(err) suite.Equal(rowNum-int64(len(pks)), suite.growing.RowNum()) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index a97d48e0f0..4f5c7decfd 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -295,7 +295,7 @@ func (node *QueryNode) Init() error { node.factory.Init(paramtable.Get()) localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() - localUsedSize, err := segments.GetLocalUsedSize(localRootPath) + localUsedSize, err := segments.GetLocalUsedSize(node.ctx, localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) initError = err diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index d2e42cdafc..ed32fb4c32 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -221,6 +221,7 @@ func (suite *QueryNodeSuite) TestStop() { schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64) collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection) segment, err := segments.NewSegment( + context.Background(), collection, 100, 10, diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 7aab0f563d..7ee029a09a 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1465,7 +1465,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys()) for _, segment := range segments { - err := segment.Delete(pks, req.GetTimestamps()) + err := segment.Delete(ctx, pks, req.GetTimestamps()) if err != nil { log.Warn("segment delete failed", zap.Error(err)) return merr.Status(err), nil diff --git a/internal/querynodev2/tasks/query_stream_task.go b/internal/querynodev2/tasks/query_stream_task.go index 450e9e91a6..96149cc172 100644 --- a/internal/querynodev2/tasks/query_stream_task.go +++ b/internal/querynodev2/tasks/query_stream_task.go @@ -48,6 +48,7 @@ func (t *QueryStreamTask) PreExecute() error { func (t *QueryStreamTask) Execute() error { retrievePlan, err := segments.NewRetrievePlan( + t.ctx, t.collection, t.req.Req.GetSerializedExprPlan(), t.req.Req.GetMvccTimestamp(), diff --git a/internal/querynodev2/tasks/query_task.go b/internal/querynodev2/tasks/query_task.go index 73fff23414..7bc7a62956 100644 --- a/internal/querynodev2/tasks/query_task.go +++ b/internal/querynodev2/tasks/query_task.go @@ -80,6 +80,7 @@ func (t *QueryTask) Execute() error { tr := timerecord.NewTimeRecorderWithTrace(t.ctx, "QueryTask") retrievePlan, err := segments.NewRetrievePlan( + t.ctx, t.collection, t.req.Req.GetSerializedExprPlan(), t.req.Req.GetMvccTimestamp(), diff --git a/internal/querynodev2/tasks/task.go b/internal/querynodev2/tasks/task.go index ee4abcc23a..e86339a382 100644 --- a/internal/querynodev2/tasks/task.go +++ b/internal/querynodev2/tasks/task.go @@ -118,7 +118,7 @@ func (t *SearchTask) Execute() error { req := t.req t.combinePlaceHolderGroups() - searchReq, err := segments.NewSearchRequest(t.collection, req, t.placeholderGroup) + searchReq, err := segments.NewSearchRequest(t.ctx, t.collection, req, t.placeholderGroup) if err != nil { return err } @@ -182,6 +182,7 @@ func (t *SearchTask) Execute() error { tr.RecordSpan() blobs, err := segments.ReduceSearchResultsAndFillData( + t.ctx, searchReq.Plan(), results, int64(len(results)), @@ -199,7 +200,7 @@ func (t *SearchTask) Execute() error { metrics.ReduceSegments). Observe(float64(tr.RecordSpan().Milliseconds())) for i := range t.originNqs { - blob, err := segments.GetSearchResultDataBlob(blobs, i) + blob, err := segments.GetSearchResultDataBlob(t.ctx, blobs, i) if err != nil { return err }