mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
Refactor ddl binlog flush (#5303)
DDL won't auto-flush now See also: #5289 , #5220 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
a7bb701f73
commit
1c779f3efd
@ -12,15 +12,16 @@ Before this design, DataNode buffers DDL chunks by collection, flushes all buffe
|
||||
|
||||
Now in [DataNode Recovery Design](datanode_recover_design_0513_2021.md), flowgraph : vchannel = 1 : 1, and insert
|
||||
data of one segment is always in one vchannel. So each flowgraph concerns only about ONE specific collection. For
|
||||
DDL channels, one flowgraph only cares about DDL operations of one collection.
|
||||
DDL channels, one flowgraph only cares about DDL operations of one collection. In this case,
|
||||
I don't think it's necessary to auto-flush ddl anymore.
|
||||
|
||||
## Goals
|
||||
|
||||
- Flowgraph knows about which segment/collection to concern.
|
||||
- DDNode update masPositions once it buffers ddl about the collection
|
||||
- DDNode buffers binlog Paths generated by auto-flush
|
||||
- In manul-flush, a background flush-complete goroutinue waits for DDNode and InsertBufferNode both done flushing,
|
||||
waiting for both binlog paths.
|
||||
- Flowgraph knows about which segment/collection to concern.
|
||||
- DDNode update masPositions once it buffers ddl about the collection.
|
||||
- DDNode won't auto flush.
|
||||
- In manul-flush, a background flush-complete goroutinue waits for DDNode and InsertBufferNode both done
|
||||
flushing, waiting for both binlog paths.
|
||||
|
||||
## Detailed design
|
||||
|
||||
@ -55,7 +56,7 @@ message SaveBinlogPathsRequest {
|
||||
int64 collectionID = 3;
|
||||
ID2PathList field2BinlogPaths = 4;
|
||||
repeated DDLBinlogMeta = 5;
|
||||
repeated internal.MsgPosition start_positions = 7;
|
||||
repeated internal.MsgPosition start_positions = 7;
|
||||
repeated internal.MsgPosition end_positions = 8;
|
||||
}
|
||||
```
|
||||
|
@ -134,16 +134,13 @@ func (bm *binlogMeta) getSegmentBinlogMeta(segmentID UniqueID) (metas []*datapb.
|
||||
// SaveDDLBinlogMetaTxn stores timestamp and ddl binlog path pair into etcd in a transaction.
|
||||
// ddl binlog meta key:
|
||||
// ${prefix}/${collectionID}/${idx}
|
||||
func (bm *binlogMeta) SaveDDLBinlogMetaTxn(collID UniqueID, tsPath string, ddlPath string) error {
|
||||
func (bm *binlogMeta) SaveDDLBinlogMetaTxn(collID UniqueID, ddlBinlogMeta *datapb.DDLBinlogMeta) error {
|
||||
|
||||
uniqueKey, err := bm.genKey(true, collID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
binlogPathPair := proto.MarshalTextString(&datapb.DDLBinlogMeta{
|
||||
DdlBinlogPath: ddlPath,
|
||||
TsBinlogPath: tsPath,
|
||||
})
|
||||
binlogPathPair := proto.MarshalTextString(ddlBinlogMeta)
|
||||
|
||||
return bm.client.Save(path.Join(Params.DDLFlushMetaSubPath, uniqueKey), binlogPathPair)
|
||||
}
|
||||
|
@ -107,29 +107,4 @@ func TestMetaTable_Basic(t *testing.T) {
|
||||
|
||||
assert.ElementsMatch(t, []string{"a", "b", "c", "aa", "bb", "cc"}, paths)
|
||||
})
|
||||
|
||||
t.Run("TestBasic_SaveDDLBinlogMetaTxn", func(t *testing.T) {
|
||||
collID := UniqueID(888888)
|
||||
tsPath := "a/b/c"
|
||||
ddlPath := "c/b/a"
|
||||
|
||||
err := meta.SaveDDLBinlogMetaTxn(collID, tsPath, ddlPath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
metas, err := meta.getDDLBinlogMete(collID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(metas))
|
||||
assert.Equal(t, "a/b/c", metas[0].GetTsBinlogPath())
|
||||
assert.Equal(t, "c/b/a", metas[0].GetDdlBinlogPath())
|
||||
|
||||
err = meta.SaveDDLBinlogMetaTxn(collID, tsPath, ddlPath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
metas, err = meta.getDDLBinlogMete(collID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, len(metas))
|
||||
assert.Equal(t, "a/b/c", metas[0].GetTsBinlogPath())
|
||||
assert.Equal(t, "c/b/a", metas[0].GetDdlBinlogPath())
|
||||
})
|
||||
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ func (dsService *dataSyncService) initNodes() {
|
||||
var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory)
|
||||
|
||||
var filterDmNode Node = newFilteredDmNode()
|
||||
var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica)
|
||||
var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
|
||||
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory, dsService.idAllocator)
|
||||
var gcNode Node = newGCNode(dsService.replica)
|
||||
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
@ -36,11 +37,12 @@ import (
|
||||
|
||||
type ddNode struct {
|
||||
BaseNode
|
||||
ddMsg *ddMsg
|
||||
ddRecords *ddRecords
|
||||
ddBuffer *ddBuffer
|
||||
flushMap *sync.Map
|
||||
inFlushCh <-chan *flushMsg
|
||||
ddMsg *ddMsg
|
||||
ddRecords *ddRecords
|
||||
ddBuffer *ddBuffer
|
||||
flushMap *sync.Map
|
||||
inFlushCh <-chan *flushMsg
|
||||
idAllocator allocatorInterface
|
||||
|
||||
kv kv.BaseKV
|
||||
replica Replica
|
||||
@ -54,8 +56,7 @@ type ddData struct {
|
||||
}
|
||||
|
||||
type ddBuffer struct {
|
||||
ddData map[UniqueID]*ddData // collection ID
|
||||
maxSize int32
|
||||
ddData map[UniqueID]*ddData // collection ID
|
||||
}
|
||||
|
||||
type ddRecords struct {
|
||||
@ -63,20 +64,15 @@ type ddRecords struct {
|
||||
partitionRecords map[UniqueID]interface{}
|
||||
}
|
||||
|
||||
func (d *ddBuffer) size() int32 {
|
||||
func (d *ddBuffer) size(collectionID UniqueID) int {
|
||||
if d.ddData == nil || len(d.ddData) <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var size int32 = 0
|
||||
for _, data := range d.ddData {
|
||||
size += int32(len(data.ddRequestString))
|
||||
if data, ok := d.ddData[collectionID]; ok {
|
||||
return len(data.ddRequestString)
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (d *ddBuffer) full() bool {
|
||||
return d.size() >= d.maxSize
|
||||
return 0
|
||||
}
|
||||
|
||||
func (ddNode *ddNode) Name() string {
|
||||
@ -142,16 +138,6 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
}
|
||||
}
|
||||
|
||||
// generate binlog
|
||||
if ddNode.ddBuffer.full() {
|
||||
for k, v := range ddNode.ddBuffer.ddData {
|
||||
ddNode.flushMap.Store(k, v)
|
||||
}
|
||||
ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData)
|
||||
log.Debug(". dd buffer full, auto flushing ...")
|
||||
go flushTxn(ddNode.flushMap, ddNode.kv, ddNode.binlogMeta)
|
||||
}
|
||||
|
||||
select {
|
||||
case fmsg := <-ddNode.inFlushCh:
|
||||
log.Debug(". receive flush message ...")
|
||||
@ -159,6 +145,19 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
for _, segID := range fmsg.segmentIDs {
|
||||
if ddNode.replica.hasSegment(segID) {
|
||||
localSegs = append(localSegs, segID)
|
||||
|
||||
seg, _ := ddNode.replica.getSegmentByID(segID)
|
||||
collID := seg.collectionID
|
||||
if ddNode.ddBuffer.size(collID) > 0 {
|
||||
log.Debug(".. ddl buffer not empty, flushing ...")
|
||||
ddNode.flushMap.Store(collID, ddNode.ddBuffer.ddData[collID])
|
||||
delete(ddNode.ddBuffer.ddData, collID)
|
||||
|
||||
binlogMetaCh := make(chan *datapb.DDLBinlogMeta)
|
||||
go flush(collID, ddNode.flushMap, ddNode.kv, ddNode.idAllocator, binlogMetaCh)
|
||||
go ddNode.flushComplete(binlogMetaCh, collID)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,21 +166,10 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
break
|
||||
}
|
||||
|
||||
log.Debug(".. Segments exist, notifying insertbuffer ...")
|
||||
log.Debug(".. notifying insertbuffer ...")
|
||||
fmsg.segmentIDs = localSegs
|
||||
ddNode.ddMsg.flushMessages = append(ddNode.ddMsg.flushMessages, fmsg)
|
||||
|
||||
if ddNode.ddBuffer.size() > 0 {
|
||||
log.Debug(".. ddl buffer not empty, flushing ...")
|
||||
for k, v := range ddNode.ddBuffer.ddData {
|
||||
ddNode.flushMap.Store(k, v)
|
||||
}
|
||||
ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData)
|
||||
|
||||
go flushTxn(ddNode.flushMap, ddNode.kv, ddNode.binlogMeta)
|
||||
|
||||
}
|
||||
|
||||
default:
|
||||
}
|
||||
|
||||
@ -193,78 +181,101 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
return []Msg{res}
|
||||
}
|
||||
|
||||
func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, collID UniqueID) {
|
||||
binlogMeta := <-binlogMetaCh
|
||||
if binlogMeta == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug(".. Saving ddl binlog meta ...")
|
||||
err := ddNode.binlogMeta.SaveDDLBinlogMetaTxn(collID, binlogMeta)
|
||||
if err != nil {
|
||||
log.Error("Save binlog meta to etcd Wrong", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
flushTxn() will do the following:
|
||||
flush will
|
||||
generate binlogs for all buffer data in ddNode,
|
||||
store the generated binlogs to minIO/S3,
|
||||
store the keys(paths to minIO/s3) of the binlogs to etcd.
|
||||
store the keys(paths to minIO/s3) of the binlogs to etcd. todo remove
|
||||
|
||||
The keys of the binlogs are generated as below:
|
||||
${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
|
||||
${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
|
||||
|
||||
${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
|
||||
${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
|
||||
*/
|
||||
func flushTxn(ddlData *sync.Map,
|
||||
kv kv.BaseKV,
|
||||
meta *binlogMeta) {
|
||||
// generate binlog
|
||||
func flush(collID UniqueID, ddlData *sync.Map, kv kv.BaseKV, idAllocator allocatorInterface,
|
||||
binlogMetaCh chan<- *datapb.DDLBinlogMeta) {
|
||||
clearFn := func(isSuccess bool) {
|
||||
if !isSuccess {
|
||||
binlogMetaCh <- nil
|
||||
}
|
||||
}
|
||||
|
||||
ddCodec := &storage.DataDefinitionCodec{}
|
||||
ddlData.Range(func(cID, d interface{}) bool {
|
||||
d, ok := ddlData.LoadAndDelete(collID)
|
||||
if !ok {
|
||||
log.Error("Flush failed ... cannot load ddlData ..")
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
|
||||
data := d.(*ddData)
|
||||
collID := cID.(int64)
|
||||
log.Debug(".. ddl flushing ...", zap.Int64("collectionID", collID), zap.Int("length", len(data.ddRequestString)))
|
||||
binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes)
|
||||
if err != nil || len(binLogs) != 2 {
|
||||
log.Error("Codec Serialize wrong", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
data := d.(*ddData)
|
||||
|
||||
if len(data.ddRequestString) != len(data.timestamps) ||
|
||||
len(data.timestamps) != len(data.eventTypes) {
|
||||
log.Error("illegal ddBuffer, failed to save binlog")
|
||||
return false
|
||||
}
|
||||
log.Debug(".. ddl flushing ...",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Int("length", len(data.ddRequestString)))
|
||||
|
||||
kvs := make(map[string]string, 2)
|
||||
tsIdx, err := meta.genKey(true)
|
||||
if err != nil {
|
||||
log.Error("Id allocate wrong", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
tsKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[0].GetKey(), tsIdx)
|
||||
kvs[tsKey] = string(binLogs[0].GetValue())
|
||||
binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes)
|
||||
if err != nil || len(binLogs) != 2 {
|
||||
log.Error("Codec Serialize wrong", zap.Error(err))
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
|
||||
ddlIdx, err := meta.genKey(true)
|
||||
if err != nil {
|
||||
log.Error("Id allocate wrong", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
ddlKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[1].GetKey(), ddlIdx)
|
||||
kvs[ddlKey] = string(binLogs[1].GetValue())
|
||||
if len(data.ddRequestString) != len(data.timestamps) ||
|
||||
len(data.timestamps) != len(data.eventTypes) {
|
||||
log.Error("illegal ddBuffer, failed to save binlog")
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
|
||||
// save ddl/ts binlog to minIO/s3
|
||||
log.Debug(".. Saving ddl binlog to minIO/s3 ...")
|
||||
err = kv.MultiSave(kvs)
|
||||
if err != nil {
|
||||
log.Error("Save to minIO/S3 Wrong", zap.Error(err))
|
||||
_ = kv.MultiRemove([]string{tsKey, ddlKey})
|
||||
return false
|
||||
}
|
||||
kvs := make(map[string]string, 2)
|
||||
tsIdx, err := idAllocator.genKey(true)
|
||||
if err != nil {
|
||||
log.Error("Id allocate wrong", zap.Error(err))
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
tsKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[0].GetKey(), tsIdx)
|
||||
kvs[tsKey] = string(binLogs[0].GetValue())
|
||||
|
||||
log.Debug(".. Saving ddl binlog meta ...")
|
||||
err = meta.SaveDDLBinlogMetaTxn(collID, tsKey, ddlKey)
|
||||
if err != nil {
|
||||
log.Error("Save binlog meta to etcd Wrong", zap.Error(err))
|
||||
_ = kv.MultiRemove([]string{tsKey, ddlKey})
|
||||
return false
|
||||
}
|
||||
ddlIdx, err := idAllocator.genKey(true)
|
||||
if err != nil {
|
||||
log.Error("Id allocate wrong", zap.Error(err))
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
ddlKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[1].GetKey(), ddlIdx)
|
||||
kvs[ddlKey] = string(binLogs[1].GetValue())
|
||||
|
||||
log.Debug(".. Clearing ddl flush buffer ...")
|
||||
ddlData.Delete(collID)
|
||||
return true
|
||||
// save ddl/ts binlog to minIO/s3
|
||||
log.Debug(".. Saving ddl binlog to minIO/s3 ...")
|
||||
err = kv.MultiSave(kvs)
|
||||
if err != nil {
|
||||
log.Error("Save to minIO/S3 Wrong", zap.Error(err))
|
||||
_ = kv.MultiRemove([]string{tsKey, ddlKey})
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug(".. Clearing ddl flush buffer ...")
|
||||
clearFn(true)
|
||||
binlogMetaCh <- &datapb.DDLBinlogMeta{
|
||||
DdlBinlogPath: ddlKey,
|
||||
TsBinlogPath: tsKey,
|
||||
}
|
||||
|
||||
})
|
||||
log.Debug(".. DDL flushing completed ...")
|
||||
}
|
||||
|
||||
@ -439,8 +450,8 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
|
||||
append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType)
|
||||
}
|
||||
|
||||
func newDDNode(ctx context.Context, binlogMeta *binlogMeta,
|
||||
inFlushCh <-chan *flushMsg, replica Replica) *ddNode {
|
||||
func newDDNode(ctx context.Context, binlogMeta *binlogMeta, inFlushCh <-chan *flushMsg,
|
||||
replica Replica, idAllocator allocatorInterface) *ddNode {
|
||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.FlowGraphMaxParallelism
|
||||
|
||||
@ -471,15 +482,14 @@ func newDDNode(ctx context.Context, binlogMeta *binlogMeta,
|
||||
BaseNode: baseNode,
|
||||
ddRecords: ddRecords,
|
||||
ddBuffer: &ddBuffer{
|
||||
ddData: make(map[UniqueID]*ddData),
|
||||
maxSize: Params.FlushDdBufferSize,
|
||||
ddData: make(map[UniqueID]*ddData),
|
||||
},
|
||||
inFlushCh: inFlushCh,
|
||||
|
||||
// idAllocator: alloc,
|
||||
kv: minioKV,
|
||||
replica: replica,
|
||||
binlogMeta: binlogMeta,
|
||||
flushMap: &sync.Map{},
|
||||
idAllocator: idAllocator,
|
||||
kv: minioKV,
|
||||
replica: replica,
|
||||
binlogMeta: binlogMeta,
|
||||
flushMap: &sync.Map{},
|
||||
}
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
|
||||
|
||||
// Params.FlushDdBufferSize = 4
|
||||
replica := newReplica()
|
||||
ddNode := newDDNode(ctx, newBinlogMeta(), inFlushCh, replica)
|
||||
ddNode := newDDNode(ctx, newBinlogMeta(), inFlushCh, replica, NewAllocatorFactory())
|
||||
|
||||
collID := UniqueID(0)
|
||||
collName := "col-test-0"
|
||||
|
@ -31,7 +31,6 @@ type ParamTable struct {
|
||||
FlowGraphMaxQueueLength int32
|
||||
FlowGraphMaxParallelism int32
|
||||
FlushInsertBufferSize int32
|
||||
FlushDdBufferSize int32
|
||||
InsertBinlogRootPath string
|
||||
DdlBinlogRootPath string
|
||||
Log log.Config
|
||||
@ -88,7 +87,6 @@ func (p *ParamTable) Init() {
|
||||
p.initFlowGraphMaxQueueLength()
|
||||
p.initFlowGraphMaxParallelism()
|
||||
p.initFlushInsertBufferSize()
|
||||
p.initFlushDdBufferSize()
|
||||
p.initInsertBinlogRootPath()
|
||||
p.initDdlBinlogRootPath()
|
||||
p.initLogCfg()
|
||||
@ -150,10 +148,6 @@ func (p *ParamTable) initFlushInsertBufferSize() {
|
||||
p.FlushInsertBufferSize = p.ParseInt32("datanode.flush.insertBufSize")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initFlushDdBufferSize() {
|
||||
p.FlushDdBufferSize = p.ParseInt32("datanode.flush.ddBufSize")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initInsertBinlogRootPath() {
|
||||
// GOOSE TODO: rootPath change to TenentID
|
||||
rootPath, err := p.Load("etcd.rootPath")
|
||||
|
@ -40,11 +40,6 @@ func TestParamTable_DataNode(t *testing.T) {
|
||||
log.Println("FlushInsertBufferSize:", size)
|
||||
})
|
||||
|
||||
t.Run("Test FlushDdBufSize", func(t *testing.T) {
|
||||
size := Params.FlushDdBufferSize
|
||||
log.Println("FlushDdBufferSize:", size)
|
||||
})
|
||||
|
||||
t.Run("Test InsertBinlogRootPath", func(t *testing.T) {
|
||||
path := Params.InsertBinlogRootPath
|
||||
log.Println("InsertBinlogRootPath:", path)
|
||||
|
Loading…
Reference in New Issue
Block a user