mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 04:19:18 +08:00
Fix golint error in querynode (#9597)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
75716e0deb
commit
a7dc9bdf15
@ -28,7 +28,7 @@ type insertNode struct {
|
||||
replica ReplicaInterface
|
||||
}
|
||||
|
||||
type InsertData struct {
|
||||
type insertData struct {
|
||||
insertIDs map[UniqueID][]int64
|
||||
insertTimestamps map[UniqueID][]Timestamp
|
||||
insertRecords map[UniqueID][]*commonpb.Blob
|
||||
@ -53,7 +53,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
insertData := InsertData{
|
||||
iData := insertData{
|
||||
insertIDs: make(map[UniqueID][]int64),
|
||||
insertTimestamps: make(map[UniqueID][]Timestamp),
|
||||
insertRecords: make(map[UniqueID][]*commonpb.Blob),
|
||||
@ -91,34 +91,34 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
}
|
||||
}
|
||||
|
||||
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
|
||||
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
|
||||
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
|
||||
iData.insertIDs[task.SegmentID] = append(iData.insertIDs[task.SegmentID], task.RowIDs...)
|
||||
iData.insertTimestamps[task.SegmentID] = append(iData.insertTimestamps[task.SegmentID], task.Timestamps...)
|
||||
iData.insertRecords[task.SegmentID] = append(iData.insertRecords[task.SegmentID], task.RowData...)
|
||||
}
|
||||
|
||||
// 2. do preInsert
|
||||
for segmentID := range insertData.insertRecords {
|
||||
for segmentID := range iData.insertRecords {
|
||||
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
}
|
||||
|
||||
var numOfRecords = len(insertData.insertRecords[segmentID])
|
||||
var numOfRecords = len(iData.insertRecords[segmentID])
|
||||
if targetSegment != nil {
|
||||
offset, err := targetSegment.segmentPreInsert(numOfRecords)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
}
|
||||
insertData.insertOffset[segmentID] = offset
|
||||
iData.insertOffset[segmentID] = offset
|
||||
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID))
|
||||
}
|
||||
}
|
||||
|
||||
// 3. do insert
|
||||
wg := sync.WaitGroup{}
|
||||
for segmentID := range insertData.insertRecords {
|
||||
for segmentID := range iData.insertRecords {
|
||||
wg.Add(1)
|
||||
go iNode.insert(&insertData, segmentID, &wg)
|
||||
go iNode.insert(&iData, segmentID, &wg)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
@ -132,7 +132,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
return []Msg{res}
|
||||
}
|
||||
|
||||
func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *sync.WaitGroup) {
|
||||
func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) {
|
||||
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
|
||||
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
@ -147,10 +147,10 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *
|
||||
return
|
||||
}
|
||||
|
||||
ids := insertData.insertIDs[segmentID]
|
||||
timestamps := insertData.insertTimestamps[segmentID]
|
||||
records := insertData.insertRecords[segmentID]
|
||||
offsets := insertData.insertOffset[segmentID]
|
||||
ids := iData.insertIDs[segmentID]
|
||||
timestamps := iData.insertTimestamps[segmentID]
|
||||
records := iData.insertRecords[segmentID]
|
||||
offsets := iData.insertOffset[segmentID]
|
||||
|
||||
err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records)
|
||||
if err != nil {
|
||||
@ -160,7 +160,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Do insert done", zap.Int("len", len(insertData.insertIDs[segmentID])),
|
||||
log.Debug("Do insert done", zap.Int("len", len(iData.insertIDs[segmentID])),
|
||||
zap.Int64("segmentID", segmentID))
|
||||
wg.Done()
|
||||
}
|
||||
|
@ -22,13 +22,13 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
func genFlowGraphInsertData() (*InsertData, error) {
|
||||
func genFlowGraphInsertData() (*insertData, error) {
|
||||
insertMsg, err := genSimpleInsertMsg()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
iData := &insertData{
|
||||
insertIDs: map[UniqueID][]UniqueID{
|
||||
defaultSegmentID: insertMsg.RowIDs,
|
||||
},
|
||||
@ -42,7 +42,7 @@ func genFlowGraphInsertData() (*InsertData, error) {
|
||||
defaultSegmentID: 0,
|
||||
},
|
||||
}
|
||||
return insertData, nil
|
||||
return iData, nil
|
||||
}
|
||||
|
||||
func TestFlowGraphInsertNode_insert(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user