diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 1d928e8947..b39e0d3719 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -28,7 +28,7 @@ type insertNode struct { replica ReplicaInterface } -type InsertData struct { +type insertData struct { insertIDs map[UniqueID][]int64 insertTimestamps map[UniqueID][]Timestamp insertRecords map[UniqueID][]*commonpb.Blob @@ -53,7 +53,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // TODO: add error handling } - insertData := InsertData{ + iData := insertData{ insertIDs: make(map[UniqueID][]int64), insertTimestamps: make(map[UniqueID][]Timestamp), insertRecords: make(map[UniqueID][]*commonpb.Blob), @@ -91,34 +91,34 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } - insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...) - insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...) - insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) + iData.insertIDs[task.SegmentID] = append(iData.insertIDs[task.SegmentID], task.RowIDs...) + iData.insertTimestamps[task.SegmentID] = append(iData.insertTimestamps[task.SegmentID], task.Timestamps...) + iData.insertRecords[task.SegmentID] = append(iData.insertRecords[task.SegmentID], task.RowData...) } // 2. do preInsert - for segmentID := range insertData.insertRecords { + for segmentID := range iData.insertRecords { var targetSegment, err = iNode.replica.getSegmentByID(segmentID) if err != nil { log.Warn(err.Error()) } - var numOfRecords = len(insertData.insertRecords[segmentID]) + var numOfRecords = len(iData.insertRecords[segmentID]) if targetSegment != nil { offset, err := targetSegment.segmentPreInsert(numOfRecords) if err != nil { log.Warn(err.Error()) } - insertData.insertOffset[segmentID] = offset + iData.insertOffset[segmentID] = offset log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID)) } } // 3. do insert wg := sync.WaitGroup{} - for segmentID := range insertData.insertRecords { + for segmentID := range iData.insertRecords { wg.Add(1) - go iNode.insert(&insertData, segmentID, &wg) + go iNode.insert(&iData, segmentID, &wg) } wg.Wait() @@ -132,7 +132,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } -func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *sync.WaitGroup) { +func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) { log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.replica.getSegmentByID(segmentID) if err != nil { @@ -147,10 +147,10 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg * return } - ids := insertData.insertIDs[segmentID] - timestamps := insertData.insertTimestamps[segmentID] - records := insertData.insertRecords[segmentID] - offsets := insertData.insertOffset[segmentID] + ids := iData.insertIDs[segmentID] + timestamps := iData.insertTimestamps[segmentID] + records := iData.insertRecords[segmentID] + offsets := iData.insertOffset[segmentID] err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records) if err != nil { @@ -160,7 +160,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg * return } - log.Debug("Do insert done", zap.Int("len", len(insertData.insertIDs[segmentID])), + log.Debug("Do insert done", zap.Int("len", len(iData.insertIDs[segmentID])), zap.Int64("segmentID", segmentID)) wg.Done() } diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index fa61052ce0..e25b8114c5 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -22,13 +22,13 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func genFlowGraphInsertData() (*InsertData, error) { +func genFlowGraphInsertData() (*insertData, error) { insertMsg, err := genSimpleInsertMsg() if err != nil { return nil, err } - insertData := &InsertData{ + iData := &insertData{ insertIDs: map[UniqueID][]UniqueID{ defaultSegmentID: insertMsg.RowIDs, }, @@ -42,7 +42,7 @@ func genFlowGraphInsertData() (*InsertData, error) { defaultSegmentID: 0, }, } - return insertData, nil + return iData, nil } func TestFlowGraphInsertNode_insert(t *testing.T) {