Add test for tsafe (#8626)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2021-09-28 12:28:03 +08:00 committed by GitHub
parent 37e4a3203e
commit e0e7042b5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 11 deletions

View File

@ -13,6 +13,7 @@ package querynode
import (
"context"
"errors"
"math"
"sync"
@ -37,11 +38,6 @@ func (watcher *tSafeWatcher) notify() {
}
}
// deprecated
func (watcher *tSafeWatcher) hasUpdate() {
<-watcher.notifyChan
}
func (watcher *tSafeWatcher) watcherChan() <-chan bool {
return watcher.notifyChan
}
@ -49,7 +45,7 @@ func (watcher *tSafeWatcher) watcherChan() <-chan bool {
type tSafer interface {
get() Timestamp
set(id UniqueID, t Timestamp)
registerTSafeWatcher(t *tSafeWatcher)
registerTSafeWatcher(t *tSafeWatcher) error
start()
close()
removeRecord(partitionID UniqueID)
@ -100,6 +96,7 @@ func (ts *tSafe) start() {
for _, watcher := range ts.watcherList {
close(watcher.notifyChan)
}
ts.watcherList = nil
close(ts.tSafeChan)
ts.tSafeMu.Unlock()
return
@ -140,17 +137,37 @@ func (ts *tSafe) start() {
func (ts *tSafe) removeRecord(partitionID UniqueID) {
ts.tSafeMu.Lock()
defer ts.tSafeMu.Unlock()
if ts.isClose {
// should not happen if tsafe_replica guard correctly
log.Warn("Try to remove record with tsafe close ",
zap.Any("channel", ts.channel),
zap.Any("id", partitionID))
return
}
log.Debug("remove tSafeRecord",
zap.Any("partitionID", partitionID),
)
delete(ts.tSafeRecord, partitionID)
var tmpT Timestamp = math.MaxUint64
for _, t := range ts.tSafeRecord {
if t <= tmpT {
tmpT = t
}
}
ts.tSafe = tmpT
for _, watcher := range ts.watcherList {
watcher.notify()
}
}
func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) {
func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) error {
ts.tSafeMu.Lock()
if ts.isClose {
return errors.New("Failed to register tsafe watcher because tsafe is closed " + ts.channel)
}
defer ts.tSafeMu.Unlock()
ts.watcherList = append(ts.watcherList, t)
return nil
}
func (ts *tSafe) get() Timestamp {
@ -164,9 +181,9 @@ func (ts *tSafe) set(id UniqueID, t Timestamp) {
defer ts.tSafeMu.Unlock()
if ts.isClose {
// should not happen if tsafe_replica guard correctly
log.Warn("Try to set id with ts close ",
log.Warn("Try to set id with tsafe close ",
zap.Any("channel", ts.channel),
zap.Any("it", id))
zap.Any("id", id))
return
}
msg := tSafeMsg{

View File

@ -13,6 +13,7 @@ package querynode
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -20,14 +21,64 @@ import (
func TestTSafe_GetAndSet(t *testing.T) {
tSafe := newTSafe(context.Background(), "TestTSafe-channel")
tSafe.start()
watcher := newTSafeWatcher()
tSafe.registerTSafeWatcher(watcher)
var wg sync.WaitGroup
wg.Add(1)
go func() {
watcher.hasUpdate()
// wait work
<-watcher.watcherChan()
timestamp := tSafe.get()
assert.Equal(t, timestamp, Timestamp(1000))
wg.Done()
}()
tSafe.set(UniqueID(1), Timestamp(1000))
wg.Wait()
}
func TestTSafe_Remove(t *testing.T) {
tSafe := newTSafe(context.Background(), "TestTSafe-remove")
tSafe.start()
watcher := newTSafeWatcher()
tSafe.registerTSafeWatcher(watcher)
tSafe.set(UniqueID(1), Timestamp(1000))
tSafe.set(UniqueID(2), Timestamp(1001))
<-watcher.watcherChan()
timestamp := tSafe.get()
assert.Equal(t, timestamp, Timestamp(1000))
tSafe.removeRecord(UniqueID(1))
timestamp = tSafe.get()
assert.Equal(t, timestamp, Timestamp(1001))
}
func TestTSafe_Close(t *testing.T) {
tSafe := newTSafe(context.Background(), "TestTSafe-close")
tSafe.start()
watcher := newTSafeWatcher()
tSafe.registerTSafeWatcher(watcher)
// test set won't panic while close
go func() {
for i := 0; i <= 100; i++ {
tSafe.set(UniqueID(i), Timestamp(1000))
}
}()
tSafe.close()
// wait until channel close
for range watcher.watcherChan() {
}
tSafe.set(UniqueID(101), Timestamp(1000))
tSafe.removeRecord(UniqueID(1))
// register TSafe will fail
err := tSafe.registerTSafeWatcher(watcher)
assert.Error(t, err)
}