Add channelsTimeTicker to proxy (#5339)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
This commit is contained in:
dragondriver 2021-05-24 09:49:28 +08:00 committed by GitHub
parent e326eaad1e
commit 457e964e41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 352 additions and 2 deletions

View File

@ -35,7 +35,7 @@ func newMockMaster() *mockMaster {
}
func genUniqueStr() string {
l := rand.Uint64() % 100
l := rand.Uint64()%100 + 1
b := make([]byte, l)
if _, err := rand.Read(b); err != nil {
return ""
@ -50,7 +50,7 @@ func (m *mockMaster) GetChannels(collectionID UniqueID) (map[vChan]pChan, error)
}
channels = make(map[vChan]pChan)
l := rand.Uint64() % 10
l := rand.Uint64()%10 + 1
for i := 0; uint64(i) < l; i++ {
channels[genUniqueStr()] = genUniqueStr()
}

View File

@ -0,0 +1,196 @@
package proxynode
import (
"context"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type pChanStatistics struct {
minTs Timestamp
maxTs Timestamp
invalid bool // invalid is true when there is no task in queue
}
// channelsTimeTickerCheckFunc(pchan, ts) return true only when all timestamp of tasks who use the pchan is greater than ts
type channelsTimeTickerCheckFunc func(string, Timestamp) bool
// ticker can update ts only when the minTs greater than the ts of ticker, we can use maxTs to update current later
type getPChanStatisticsFunc func(pChan) (pChanStatistics, error)
// use interface tsoAllocator to keep channelsTimeTickerImpl testable
type tsoAllocator interface {
//Start() error
AllocOne() (Timestamp, error)
//Alloc(count uint32) ([]Timestamp, error)
//ClearCache()
}
type channelsTimeTicker interface {
start() error
close() error
addPChan(pchan pChan) error
getLastTick(pchan pChan) (Timestamp, error)
}
type channelsTimeTickerImpl struct {
interval time.Duration // interval to synchronize
minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp
statisticsMtx sync.RWMutex
getStatistics getPChanStatisticsFunc
tso tsoAllocator
currents map[pChan]Timestamp
currentsMtx sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func (ticker *channelsTimeTickerImpl) initStatistics() {
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
for pchan := range ticker.minTsStatistics {
ticker.minTsStatistics[pchan] = 0
}
}
func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) {
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
for pchan := range ticker.currents {
ticker.currents[pchan] = current
}
}
// What if golang support generic? interface{} is not comparable now!
func getTs(ts1, ts2 Timestamp, comp func(ts1, ts2 Timestamp) bool) Timestamp {
if comp(ts1, ts2) {
return ts1
}
return ts2
}
func (ticker *channelsTimeTickerImpl) tick() error {
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
for pchan := range ticker.currents {
current := ticker.currents[pchan]
stats, err := ticker.getStatistics(pchan)
if err != nil {
continue
}
if !stats.invalid && stats.minTs > current {
ticker.minTsStatistics[pchan] = current
ticker.currents[pchan] = getTs(current+Timestamp(ticker.interval), stats.maxTs, func(ts1, ts2 Timestamp) bool {
return ts1 > ts2
})
}
}
return nil
}
func (ticker *channelsTimeTickerImpl) tickLoop() {
defer ticker.wg.Done()
timer := time.NewTicker(ticker.interval)
defer timer.Stop()
for {
select {
case <-ticker.ctx.Done():
return
case <-timer.C:
err := ticker.tick()
if err != nil {
log.Warn("channelsTimeTickerImpl.tickLoop", zap.Error(err))
}
}
}
}
func (ticker *channelsTimeTickerImpl) start() error {
ticker.initStatistics()
current, err := ticker.tso.AllocOne()
if err != nil {
return err
}
ticker.initCurrents(current)
ticker.wg.Add(1)
go ticker.tickLoop()
return nil
}
func (ticker *channelsTimeTickerImpl) close() error {
ticker.cancel()
ticker.wg.Wait()
return nil
}
func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error {
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
if _, ok := ticker.minTsStatistics[pchan]; ok {
return fmt.Errorf("pChan %v already exist", pchan)
}
ticker.minTsStatistics[pchan] = 0
return nil
}
func (ticker *channelsTimeTickerImpl) getLastTick(pchan pChan) (Timestamp, error) {
ticker.statisticsMtx.RLock()
defer ticker.statisticsMtx.RUnlock()
ts, ok := ticker.minTsStatistics[pchan]
if !ok {
return 0, fmt.Errorf("pChan %v not found", pchan)
}
return ts, nil
}
func newChannelsTimeTicker(
ctx context.Context,
interval time.Duration,
pchans []pChan,
getStatistics getPChanStatisticsFunc,
tso tsoAllocator,
) *channelsTimeTickerImpl {
ctx1, cancel := context.WithCancel(ctx)
ticker := &channelsTimeTickerImpl{
interval: interval,
minTsStatistics: make(map[pChan]Timestamp),
getStatistics: getStatistics,
tso: tso,
currents: make(map[pChan]Timestamp),
ctx: ctx1,
cancel: cancel,
}
for _, pchan := range pchans {
ticker.minTsStatistics[pchan] = 0
ticker.currents[pchan] = 0
}
return ticker
}

View File

@ -0,0 +1,154 @@
package proxynode
import (
"context"
"math/rand"
"sync"
"testing"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
)
type mockTsoAllocator struct {
}
func (tso *mockTsoAllocator) AllocOne() (Timestamp, error) {
return Timestamp(time.Now().UnixNano()), nil
}
func newMockTsoAllocator() *mockTsoAllocator {
return &mockTsoAllocator{}
}
func getStatistics(pchan pChan) (pChanStatistics, error) {
stats := pChanStatistics{
minTs: Timestamp(time.Now().UnixNano()),
invalid: false,
}
stats.maxTs = stats.minTs + Timestamp(time.Millisecond*10)
return stats, nil
}
func TestChannelsTimeTickerImpl_start(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
pchans := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans = append(pchans, genUniqueStr())
}
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
err := ticker.start()
assert.Equal(t, nil, err)
defer func() {
err := ticker.close()
assert.Equal(t, nil, err)
}()
time.Sleep(time.Second)
}
func TestChannelsTimeTickerImpl_close(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
pchans := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans = append(pchans, genUniqueStr())
}
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
err := ticker.start()
assert.Equal(t, nil, err)
defer func() {
err := ticker.close()
assert.Equal(t, nil, err)
}()
time.Sleep(time.Second)
}
func TestChannelsTimeTickerImpl_addPChan(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
pchans := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans = append(pchans, genUniqueStr())
}
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
err := ticker.start()
assert.Equal(t, nil, err)
newPChanNum := rand.Uint64()%10 + 1
for i := 0; uint64(i) < newPChanNum; i++ {
err = ticker.addPChan(genUniqueStr())
assert.Equal(t, nil, err)
}
defer func() {
err := ticker.close()
assert.Equal(t, nil, err)
}()
time.Sleep(time.Second)
}
func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
pchans := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans = append(pchans, genUniqueStr())
}
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
err := ticker.start()
assert.Equal(t, nil, err)
var wg sync.WaitGroup
wg.Add(1)
b := make(chan struct{}, 1)
go func() {
defer wg.Done()
timer := time.NewTicker(interval * 40)
for {
select {
case <-b:
return
case <-timer.C:
for _, pchan := range pchans {
ts, err := ticker.getLastTick(pchan)
assert.Equal(t, nil, err)
log.Debug("TestChannelsTimeTickerImpl_getLastTick",
zap.Any("pchan", pchan),
zap.Any("minTs", ts))
}
}
}
}()
time.Sleep(time.Second)
b <- struct{}{}
wg.Wait()
defer func() {
err := ticker.close()
assert.Equal(t, nil, err)
}()
time.Sleep(time.Second)
}