milvus/internal/master/flush_scheduler.go
sunby 95b162ccfd Refactor flush scheduler
Signed-off-by: sunby <bingyi.sun@zilliz.com>
2021-01-05 10:57:59 +08:00

161 lines
4.3 KiB
Go

package master
import (
"context"
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/errors"
)
type FlushScheduler struct {
client WriteNodeClient
metaTable *metaTable
segmentFlushChan chan UniqueID
segmentDescribeChan chan UniqueID
indexBuilderSch persistenceScheduler
ctx context.Context
cancel context.CancelFunc
}
func NewFlushScheduler(ctx context.Context, client WriteNodeClient, metaTable *metaTable, buildScheduler *IndexBuildScheduler) *FlushScheduler {
ctx2, cancel := context.WithCancel(ctx)
return &FlushScheduler{
client: client,
metaTable: metaTable,
indexBuilderSch: buildScheduler,
segmentFlushChan: make(chan UniqueID, 100),
segmentDescribeChan: make(chan UniqueID, 100),
ctx: ctx2,
cancel: cancel,
}
}
func (scheduler *FlushScheduler) schedule(id interface{}) error {
segmentID := id.(UniqueID)
err := scheduler.client.FlushSegment(segmentID)
log.Printf("flush segment %d", segmentID)
if err != nil {
return err
}
scheduler.segmentDescribeChan <- segmentID
return nil
}
func (scheduler *FlushScheduler) describe() error {
timeTick := time.Tick(100 * time.Millisecond)
descTasks := make(map[UniqueID]bool)
closable := make([]UniqueID, 0)
for {
select {
case <-scheduler.ctx.Done():
{
log.Printf("broadcast context done, exit")
return errors.New("broadcast done exit")
}
case <-timeTick:
for singleSegmentID := range descTasks {
description, err := scheduler.client.DescribeSegment(singleSegmentID)
if err != nil {
log.Printf("describe segment %d err %s", singleSegmentID, err.Error())
continue
}
if !description.IsClosed {
continue
}
log.Printf("flush segment %d is closed", singleSegmentID)
mapData, err := scheduler.client.GetInsertBinlogPaths(singleSegmentID)
if err != nil {
log.Printf("get insert binlog paths err, segID: %d, err: %s", singleSegmentID, err.Error())
continue
}
segMeta, err := scheduler.metaTable.GetSegmentByID(singleSegmentID)
if err != nil {
log.Printf("get segment from metable failed, segID: %d, err: %s", singleSegmentID, err.Error())
continue
}
for fieldID, data := range mapData {
// check field indexable
indexable, err := scheduler.metaTable.IsIndexable(segMeta.CollectionID, fieldID)
if err != nil {
log.Printf("check field indexable from meta table failed, collID: %d, fieldID: %d, err %s", segMeta.CollectionID, fieldID, err.Error())
continue
}
if !indexable {
continue
}
info := &IndexBuildInfo{
segmentID: singleSegmentID,
fieldID: fieldID,
binlogFilePath: data,
}
err = scheduler.indexBuilderSch.Enqueue(info)
log.Printf("segment %d field %d enqueue build index scheduler", singleSegmentID, fieldID)
if err != nil {
log.Printf("index build enqueue failed, %s", err.Error())
continue
}
}
// Save data to meta table
segMeta.BinlogFilePaths = make([]*etcdpb.FieldBinlogFiles, 0)
for k, v := range mapData {
segMeta.BinlogFilePaths = append(segMeta.BinlogFilePaths, &etcdpb.FieldBinlogFiles{
FieldID: k,
BinlogFiles: v,
})
}
if err = scheduler.metaTable.UpdateSegment(segMeta); err != nil {
return err
}
log.Printf("flush segment %d finished", singleSegmentID)
closable = append(closable, singleSegmentID)
}
// remove closed segment and clear closable
for _, segID := range closable {
delete(descTasks, segID)
}
closable = closable[:0]
case segID := <-scheduler.segmentDescribeChan:
descTasks[segID] = false
}
}
}
func (scheduler *FlushScheduler) scheduleLoop() {
for {
select {
case id := <-scheduler.segmentFlushChan:
err := scheduler.schedule(id)
if err != nil {
log.Println(err)
}
case <-scheduler.ctx.Done():
log.Print("server is closed, exit flush scheduler loop")
return
}
}
}
func (scheduler *FlushScheduler) Enqueue(id interface{}) error {
scheduler.segmentFlushChan <- id.(UniqueID)
return nil
}
func (scheduler *FlushScheduler) Start() error {
go scheduler.scheduleLoop()
go scheduler.describe()
return nil
}
func (scheduler *FlushScheduler) Close() {
scheduler.cancel()
}