Add gcNode, and remove msg's downstream index

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-01-06 14:02:25 +08:00 committed by yefu.chen
parent 1cfc2ff0a5
commit 810be533ab
10 changed files with 150 additions and 57 deletions

View File

@ -127,7 +127,6 @@ install: all
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/master $(GOPATH)/bin/master
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxy $(GOPATH)/bin/proxy
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/writenode $(GOPATH)/bin/writenode
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/indexbuilder $(GOPATH)/bin/indexbuilder
@mkdir -p $(LIBRARY_PATH) && cp -f $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH)
@echo "Installation successful."
@ -135,10 +134,7 @@ clean:
@echo "Cleaning up all the generated files"
@find . -name '*.test' | xargs rm -fv
@find . -name '*~' | xargs rm -fv
@rm -rf bin/
@rm -rf lib/
@rm -rf $(GOPATH)/bin/master
@rm -rf $(GOPATH)/bin/proxy
@rm -rf $(GOPATH)/bin/querynode
@rm -rf $(GOPATH)/bin/writenode
@rm -rf $(GOPATH)/bin/indexbuilder
@rm -rvf querynode
@rm -rvf master
@rm -rvf proxy
@rm -rvf writenode

View File

@ -48,6 +48,7 @@ func (dsService *dataSyncService) initNodes() {
var insertNode Node = newInsertNode(dsService.replica)
var serviceTimeNode Node = newServiceTimeNode(dsService.replica)
var gcNode Node = newGCNode(dsService.replica)
dsService.fg.AddNode(&dmStreamNode)
dsService.fg.AddNode(&ddStreamNode)
@ -57,6 +58,7 @@ func (dsService *dataSyncService) initNodes() {
dsService.fg.AddNode(&insertNode)
dsService.fg.AddNode(&serviceTimeNode)
dsService.fg.AddNode(&gcNode)
// dmStreamNode
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
@ -106,9 +108,17 @@ func (dsService *dataSyncService) initNodes() {
// serviceTimeNode
err = dsService.fg.SetEdges(serviceTimeNode.Name(),
[]string{insertNode.Name()},
[]string{},
[]string{gcNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
}
// gcNode
err = dsService.fg.SetEdges(gcNode.Name(),
[]string{serviceTimeNode.Name()},
[]string{})
if err != nil {
log.Fatal("set edges failed in node:", gcNode.Name())
}
}

View File

@ -44,6 +44,11 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
},
}
ddNode.ddMsg = &ddMsg
gcRecord := gcRecord{
collections: make([]UniqueID, 0),
partitions: make([]partitionWithID, 0),
}
ddNode.ddMsg.gcRecord = &gcRecord
// sort tsMessages
tsMessages := msMsg.TsMessages()
@ -115,11 +120,11 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
collectionID := msg.CollectionID
err := ddNode.replica.removeCollection(collectionID)
if err != nil {
log.Println(err)
return
}
//err := ddNode.replica.removeCollection(collectionID)
//if err != nil {
// log.Println(err)
// return
//}
collectionName := msg.CollectionName.CollectionName
ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName],
@ -127,6 +132,8 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
createOrDrop: false,
timestamp: msg.Timestamp,
})
ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID)
}
func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
@ -150,17 +157,22 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
collectionID := msg.CollectionID
partitionTag := msg.PartitionName.Tag
err := ddNode.replica.removePartition(collectionID, partitionTag)
if err != nil {
log.Println(err)
return
}
//err := ddNode.replica.removePartition(collectionID, partitionTag)
//if err != nil {
// log.Println(err)
// return
//}
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag],
metaOperateRecord{
createOrDrop: false,
timestamp: msg.Timestamp,
})
ddNode.ddMsg.gcRecord.partitions = append(ddNode.ddMsg.gcRecord.partitions, partitionWithID{
partitionTag: partitionTag,
collectionID: collectionID,
})
}
func newDDNode(replica collectionReplica) *ddNode {

View File

@ -2,6 +2,7 @@ package querynode
import (
"log"
"math"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -59,6 +60,7 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
}
}
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
return []*Msg{&res}
}
@ -81,17 +83,35 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
log.Println("Error, misaligned messages detected")
return nil
}
tmpTimestamps := make([]Timestamp, 0)
tmpRowIDs := make([]int64, 0)
tmpRowData := make([]*commonpb.Blob, 0)
targetTimestamp := records[len(records)-1].timestamp
// calculate valid time range
timeBegin := Timestamp(0)
timeEnd := Timestamp(math.MaxUint64)
for _, record := range records {
if record.createOrDrop && timeBegin < record.timestamp {
timeBegin = record.timestamp
}
if !record.createOrDrop && timeEnd > record.timestamp {
timeEnd = record.timestamp
}
}
for i, t := range msg.Timestamps {
if t >= targetTimestamp {
if t >= timeBegin && t <= timeEnd {
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
}
}
if len(tmpRowIDs) <= 0 {
return nil
}
msg.Timestamps = tmpTimestamps
msg.RowIDs = tmpRowIDs
msg.RowData = tmpRowData

View File

@ -0,0 +1,61 @@
package querynode
import (
"log"
)
type gcNode struct {
BaseNode
replica collectionReplica
}
func (gcNode *gcNode) Name() string {
return "gcNode"
}
func (gcNode *gcNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do gcNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in gcNode, input length = ", len(in))
// TODO: add error handling
}
gcMsg, ok := (*in[0]).(*gcMsg)
if !ok {
log.Println("type assertion failed for gcMsg")
// TODO: add error handling
}
// drop collections
for _, collectionID := range gcMsg.gcRecord.collections {
err := gcNode.replica.removeCollection(collectionID)
if err != nil {
log.Println(err)
}
}
// drop partitions
for _, partition := range gcMsg.gcRecord.partitions {
err := gcNode.replica.removePartition(partition.collectionID, partition.partitionTag)
if err != nil {
log.Println(err)
}
}
return nil
}
func newGCNode(replica collectionReplica) *gcNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &gcNode{
BaseNode: baseNode,
replica: replica,
}
}

View File

@ -90,6 +90,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
wg.Wait()
var res Msg = &serviceTimeMsg{
gcRecord: iMsg.gcRecord,
timeRange: iMsg.timeRange,
}
return []*Msg{&res}

View File

@ -16,6 +16,7 @@ type key2SegMsg struct {
type ddMsg struct {
collectionRecords map[string][]metaOperateRecord
partitionRecords map[string][]metaOperateRecord
gcRecord *gcRecord
timeRange TimeRange
}
@ -26,6 +27,7 @@ type metaOperateRecord struct {
type insertMsg struct {
insertMessages []*msgstream.InsertMsg
gcRecord *gcRecord
timeRange TimeRange
}
@ -35,6 +37,12 @@ type deleteMsg struct {
}
type serviceTimeMsg struct {
gcRecord *gcRecord
timeRange TimeRange
}
type gcMsg struct {
gcRecord *gcRecord
timeRange TimeRange
}
@ -55,42 +63,39 @@ type DeletePreprocessData struct {
count int32
}
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
// TODO: replace partitionWithID by partition id
type partitionWithID struct {
partitionTag string
collectionID UniqueID
}
func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
type gcRecord struct {
// collections and partitions to be dropped
collections []UniqueID
// TODO: use partition id
partitions []partitionWithID
}
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
}
func (suMsg *ddMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
}
func (suMsg *ddMsg) DownStreamNodeIdx() int {
return 0
}
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (iMsg *insertMsg) DownStreamNodeIdx() int {
return 0
}
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
}
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
}
func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int {
return 0
func (gcMsg *gcMsg) TimeTick() Timestamp {
return gcMsg.timeRange.timestampMax
}

View File

@ -30,7 +30,12 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
// update service time
stNode.replica.getTSafe().set(serviceTimeMsg.timeRange.timestampMax)
//fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax))
return nil
var res Msg = &gcMsg{
gcRecord: serviceTimeMsg.gcRecord,
timeRange: serviceTimeMsg.timeRange,
}
return []*Msg{&res}
}
func newServiceTimeNode(replica collectionReplica) *serviceTimeNode {

View File

@ -4,7 +4,6 @@ import "github.com/zilliztech/milvus-distributed/internal/msgstream"
type Msg interface {
TimeTick() Timestamp
DownStreamNodeIdx() int
}
type MsgStreamMsg struct {

View File

@ -46,30 +46,14 @@ func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
}
func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
}
func (suMsg *ddMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
}
func (suMsg *ddMsg) DownStreamNodeIdx() int {
return 0
}
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (iMsg *insertMsg) DownStreamNodeIdx() int {
return 0
}
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
}