milvus/internal/datanode/flush_task.go
congqixia ae2a301662
Add injection logic for FlushManager (#10580)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2021-10-25 20:17:34 +08:00

169 lines
4.3 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"errors"
"sync"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
// errStart used for retry start
var errStart = errors.New("start")
// flushInsertTask defines action for flush insert
type flushInsertTask interface {
flushInsertData() error
}
// flushDeleteTask defines action for flush delete
type flushDeleteTask interface {
flushDeleteData() error
}
// flushTaskRunner controls a single flush task lifetime
// this runner will wait insert data flush & del data flush done
// then call the notifyFunc
type flushTaskRunner struct {
sync.WaitGroup
kv.BaseKV
initOnce sync.Once
insertOnce sync.Once
deleteOnce sync.Once
startSignal <-chan struct{}
finishSignal chan struct{}
injectSignal <-chan taskInjection
segmentID UniqueID
insertLogs map[UniqueID]string
statsLogs map[UniqueID]string
deltaLogs []*DelDataBuf
pos *internalpb.MsgPosition
flushed bool
}
type taskInjection struct {
injected chan struct{} // channel to notify injected
injectOver chan bool // indicates injection over
postInjection func(pack *segmentFlushPack)
}
// init initializes flushTaskRunner with provided actions and signal
func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal <-chan struct{}) {
t.initOnce.Do(func() {
t.startSignal = signal
t.finishSignal = make(chan struct{})
go t.waitFinish(f, postFunc)
})
}
// runFlushInsert executei flush insert task with once and retry
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) {
t.insertOnce.Do(func() {
t.insertLogs = binlogs
t.statsLogs = statslogs
t.flushed = flushed
t.pos = pos
go func() {
err := errStart
for err != nil {
err = task.flushInsertData()
}
t.Done()
}()
})
}
// runFlushDel execute flush delete task with once and retry
func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBuf) {
t.deleteOnce.Do(func() {
if deltaLogs == nil {
t.deltaLogs = []*DelDataBuf{}
} else {
t.deltaLogs = []*DelDataBuf{deltaLogs}
}
go func() {
err := errStart
for err != nil {
err = task.flushDeleteData()
}
t.Done()
}()
})
}
// waitFinish waits flush & insert done
func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPostFunc) {
// wait insert & del done
t.Wait()
// wait previous task done
<-t.startSignal
pack := t.getFlushPack()
var postInjection postInjectionFunc = nil
select {
case injection := <-t.injectSignal:
// notify injected
injection.injected <- struct{}{}
ok := <-injection.injectOver
if ok {
// apply postInjection func
postInjection = injection.postInjection
}
default:
}
postFunc(pack, postInjection)
// execution done, dequeue and make count --
err := errStart
for err != nil {
err = notifyFunc(pack)
}
// notify next task
close(t.finishSignal)
}
func (t *flushTaskRunner) getFlushPack() *segmentFlushPack {
pack := &segmentFlushPack{
segmentID: t.segmentID,
insertLogs: t.insertLogs,
statsLogs: t.statsLogs,
pos: t.pos,
deltaLogs: t.deltaLogs,
flushed: t.flushed,
}
return pack
}
// newFlushTaskRunner create a usable task runner
func newFlushTaskRunner(segmentID UniqueID, injectCh <-chan taskInjection) *flushTaskRunner {
t := &flushTaskRunner{
WaitGroup: sync.WaitGroup{},
segmentID: segmentID,
injectSignal: injectCh,
}
// insert & del
t.Add(2)
return t
}