milvus/internal/querynodev2/pipeline/delete_node.go
yah01 081572d31c
Refactor QueryNode (#21625)
Signed-off-by: yah01 <yang.cen@zilliz.com>
Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: aoiasd <zhicheng.yue@zilliz.com>
2023-03-27 00:42:00 +08:00

100 lines
3.2 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"fmt"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/storage"
base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/samber/lo"
"go.uber.org/zap"
)
type deleteNode struct {
*BaseNode
collectionID UniqueID
channel string
manager *DataManager
tSafeManager TSafeManager
delegator delegator.ShardDelegator
}
//addDeleteData find the segment of delete column in DeleteMsg and save in deleteData
func (dNode *deleteNode) addDeleteData(deleteDatas map[UniqueID]*delegator.DeleteData, msg *DeleteMsg) {
deleteData, ok := deleteDatas[msg.PartitionID]
if !ok {
deleteData = &delegator.DeleteData{
PartitionID: msg.PartitionID,
}
deleteDatas[msg.PartitionID] = deleteData
}
pks := storage.ParseIDs2PrimaryKeys(msg.PrimaryKeys)
deleteData.PrimaryKeys = append(deleteData.PrimaryKeys, pks...)
deleteData.Timestamps = append(deleteData.Timestamps, msg.Timestamps...)
deleteData.RowCount += int64(len(pks))
log.Info("pipeline fetch delete msg",
zap.Int64("collectionID", dNode.collectionID),
zap.Int64("partitionID", msg.PartitionID),
zap.Int("insertRowNum", len(pks)),
zap.Uint64("timestampMin", msg.BeginTimestamp),
zap.Uint64("timestampMax", msg.EndTimestamp))
}
func (dNode *deleteNode) Operate(in Msg) Msg {
nodeMsg := in.(*deleteNodeMsg)
// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)
for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}
if len(deleteDatas) > 0 {
//do Delete, use ts range max as ts
dNode.delegator.ProcessDelete(lo.Values(deleteDatas), nodeMsg.timeRange.timestampMax)
}
//update tSafe
err := dNode.tSafeManager.Set(dNode.channel, nodeMsg.timeRange.timestampMax)
if err != nil {
// should not happen, QueryNode should addTSafe before start pipeline
panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", dNode.collectionID, err))
}
return nil
}
func newDeleteNode(
collectionID UniqueID, channel string,
manager *DataManager, tSafeManager TSafeManager, delegator delegator.ShardDelegator,
maxQueueLength int32,
) *deleteNode {
return &deleteNode{
BaseNode: base.NewBaseNode(fmt.Sprintf("DeleteNode-%s", channel), maxQueueLength),
collectionID: collectionID,
channel: channel,
manager: manager,
tSafeManager: tSafeManager,
delegator: delegator,
}
}