Remove tSafeWatcher and vChannel when release (#12264)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-11-26 01:33:16 +08:00 committed by GitHub
parent ad83090925
commit 58964621a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 220 additions and 56 deletions

View File

@ -42,10 +42,10 @@ type Collection struct {
id UniqueID
partitionIDs []UniqueID
schema *schemapb.CollectionSchema
channelMu sync.RWMutex
vChannels []Channel
pChannels []Channel
channelMu sync.RWMutex
vChannels []Channel
pChannels []Channel
vDeltaChannels []Channel
pDeltaChannels []Channel
@ -114,7 +114,22 @@ OUTER:
func (c *Collection) getVChannels() []Channel {
c.channelMu.RLock()
defer c.channelMu.RUnlock()
return c.vChannels
tmpChannels := make([]Channel, len(c.vChannels))
copy(tmpChannels, c.vChannels)
return tmpChannels
}
// removeVChannel remove the virtual channel from collection
func (c *Collection) removeVChannel(channel Channel) {
c.channelMu.Lock()
defer c.channelMu.Unlock()
tmpChannels := make([]Channel, 0)
for _, vChannel := range c.vChannels {
if channel != vChannel {
tmpChannels = append(tmpChannels, vChannel)
}
}
c.vChannels = tmpChannels
}
// addPChannels add physical channels to physical channels of collection
@ -144,11 +159,15 @@ OUTER:
func (c *Collection) getPChannels() []Channel {
c.channelMu.RLock()
defer c.channelMu.RUnlock()
return c.pChannels
tmpChannels := make([]Channel, len(c.pChannels))
copy(tmpChannels, c.pChannels)
return tmpChannels
}
// addPChannels add physical channels to physical channels of collection
func (c *Collection) addPDeltaChannels(channels []Channel) {
c.channelMu.Lock()
defer c.channelMu.Unlock()
OUTER:
for _, dstChan := range channels {
for _, srcChan := range c.pDeltaChannels {
@ -170,15 +189,25 @@ OUTER:
// getPChannels get physical channels of collection
func (c *Collection) getPDeltaChannels() []Channel {
return c.pDeltaChannels
c.channelMu.RLock()
defer c.channelMu.RUnlock()
tmpChannels := make([]Channel, len(c.pDeltaChannels))
copy(tmpChannels, c.pDeltaChannels)
return tmpChannels
}
func (c *Collection) getVDeltaChannels() []Channel {
return c.vDeltaChannels
c.channelMu.RLock()
defer c.channelMu.RUnlock()
tmpChannels := make([]Channel, len(c.vDeltaChannels))
copy(tmpChannels, c.vDeltaChannels)
return tmpChannels
}
// addVChannels add virtual channels to collection
func (c *Collection) addVDeltaChannels(channels []Channel) {
c.channelMu.Lock()
defer c.channelMu.Unlock()
OUTER:
for _, dstChan := range channels {
for _, srcChan := range c.vDeltaChannels {
@ -198,6 +227,18 @@ OUTER:
}
}
func (c *Collection) removeVDeltaChannel(channel Channel) {
c.channelMu.Lock()
defer c.channelMu.Unlock()
tmpChannels := make([]Channel, 0)
for _, vChannel := range c.vDeltaChannels {
if channel != vChannel {
tmpChannels = append(tmpChannels, vChannel)
}
}
c.vDeltaChannels = tmpChannels
}
// setReleaseTime records when collection is released
func (c *Collection) setReleaseTime(t Timestamp) {
c.releaseMu.Lock()

View File

@ -50,12 +50,16 @@ func TestCollection_vChannel(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
collection.addVChannels([]string{defaultVChannel})
collection.addVChannels([]string{defaultVChannel})
collection.addVChannels([]string{"TestCollection_addVChannel_channel"})
collection.addVChannels([]Channel{defaultVChannel})
collection.addVChannels([]Channel{defaultVChannel})
collection.addVChannels([]Channel{"TestCollection_addVChannel_channel"})
channels := collection.getVChannels()
assert.Equal(t, 2, len(channels))
collection.removeVChannel(defaultVChannel)
channels = collection.getVChannels()
assert.Equal(t, 1, len(channels))
}
func TestCollection_vDeltaChannel(t *testing.T) {
@ -63,12 +67,16 @@ func TestCollection_vDeltaChannel(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
collection.addVDeltaChannels([]string{defaultHistoricalVChannel})
collection.addVDeltaChannels([]string{defaultHistoricalVChannel})
collection.addVDeltaChannels([]string{"TestCollection_addVDeltaChannel_channel"})
collection.addVDeltaChannels([]Channel{defaultHistoricalVChannel})
collection.addVDeltaChannels([]Channel{defaultHistoricalVChannel})
collection.addVDeltaChannels([]Channel{"TestCollection_addVDeltaChannel_channel"})
channels := collection.getVDeltaChannels()
assert.Equal(t, 2, len(channels))
collection.removeVDeltaChannel(defaultHistoricalVChannel)
channels = collection.getVDeltaChannels()
assert.Equal(t, 1, len(channels))
}
func TestCollection_pChannel(t *testing.T) {
@ -76,9 +84,9 @@ func TestCollection_pChannel(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
collection.addPChannels([]string{"TestCollection_addPChannel_channel-0"})
collection.addPChannels([]string{"TestCollection_addPChannel_channel-0"})
collection.addPChannels([]string{"TestCollection_addPChannel_channel-1"})
collection.addPChannels([]Channel{"TestCollection_addPChannel_channel-0"})
collection.addPChannels([]Channel{"TestCollection_addPChannel_channel-0"})
collection.addPChannels([]Channel{"TestCollection_addPChannel_channel-1"})
channels := collection.getPChannels()
assert.Equal(t, 2, len(channels))
@ -89,9 +97,9 @@ func TestCollection_pDeltaChannel(t *testing.T) {
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
collection.addPDeltaChannels([]string{"TestCollection_addPDeltaChannel_channel-0"})
collection.addPDeltaChannels([]string{"TestCollection_addPDeltaChannel_channel-0"})
collection.addPDeltaChannels([]string{"TestCollection_addPDeltaChannel_channel-1"})
collection.addPDeltaChannels([]Channel{"TestCollection_addPDeltaChannel_channel-0"})
collection.addPDeltaChannels([]Channel{"TestCollection_addPDeltaChannel_channel-0"})
collection.addPDeltaChannels([]Channel{"TestCollection_addPDeltaChannel_channel-1"})
channels := collection.getPDeltaChannels()
assert.Equal(t, 2, len(channels))

View File

@ -245,8 +245,8 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) {
dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})
err = dataSyncService.tSafeReplica.removeTSafe(defaultVChannel)
assert.NoError(t, err)
isRemoved := dataSyncService.tSafeReplica.removeTSafe(defaultVChannel)
assert.True(t, isRemoved)
dataSyncService.removePartitionFlowGraph(defaultPartitionID)
})
}

View File

@ -80,8 +80,8 @@ func TestServiceTimeNode_Operate(t *testing.T) {
t.Run("test no tSafe", func(t *testing.T) {
node := genServiceTimeNode()
err := node.tSafeReplica.removeTSafe(defaultVChannel)
assert.NoError(t, err)
isRemoved := node.tSafeReplica.removeTSafe(defaultVChannel)
assert.True(t, isRemoved)
msg := &serviceTimeMsg{
timeRange: TimeRange{
timestampMin: 0,

View File

@ -193,15 +193,36 @@ func (q *queryCollection) addTSafeWatcher(vChannel Channel) error {
zap.Any("collectionID", q.collectionID),
zap.Any("channel", vChannel),
)
go q.startWatcher(q.tSafeWatchers[vChannel].watcherChan())
go q.startWatcher(q.tSafeWatchers[vChannel].watcherChan(), q.tSafeWatchers[vChannel].closeCh)
return nil
}
// TODO: add stopWatcher(), add close() to tSafeWatcher
func (q *queryCollection) startWatcher(channel <-chan bool) {
func (q *queryCollection) removeTSafeWatcher(channel Channel) error {
q.tSafeWatchersMu.Lock()
defer q.tSafeWatchersMu.Unlock()
if _, ok := q.tSafeWatchers[channel]; !ok {
err := errors.New(fmt.Sprintln("tSafeWatcher of queryCollection not exists, ",
"collectionID = ", q.collectionID, ", ",
"channel = ", channel))
return err
}
q.tSafeWatchers[channel].close()
delete(q.tSafeWatchers, channel)
log.Debug("remove tSafeWatcher from queryCollection",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
)
return nil
}
func (q *queryCollection) startWatcher(channel <-chan bool, closeCh <-chan struct{}) {
for {
select {
case <-q.releaseCtx.Done():
log.Debug("stop queryCollection watcher because queryCollection ctx done", zap.Any("collectionID", q.collectionID))
return
case <-closeCh:
log.Debug("stop queryCollection watcher because watcher closed", zap.Any("collectionID", q.collectionID))
return
case <-channel:
// TODO: check if channel is closed
@ -514,7 +535,7 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
//time.Sleep(10 * time.Millisecond)
serviceTime, err := q.waitNewTSafe()
if err != nil {
log.Error(err.Error())
log.Error("[should not happen!] stop doUnsolvedMsg, err = " + err.Error())
return
}
//st, _ := tsoutil.ParseTS(serviceTime)

View File

@ -472,13 +472,21 @@ func TestQueryCollection_serviceableTime(t *testing.T) {
assert.Equal(t, st+gracefulTime, resST)
}
func TestQueryCollection_addTSafeWatcher(t *testing.T) {
func TestQueryCollection_tSafeWatcher(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queryCollection, err := genSimpleQueryCollection(ctx, cancel)
assert.NoError(t, err)
queryCollection.addTSafeWatcher(defaultVChannel)
err = queryCollection.addTSafeWatcher(defaultVChannel)
assert.NoError(t, err)
err = queryCollection.removeTSafeWatcher(defaultVChannel)
assert.NoError(t, err)
// no tSafe watcher
err = queryCollection.removeTSafeWatcher(defaultVChannel)
assert.Error(t, err)
}
func TestQueryCollection_waitNewTSafe(t *testing.T) {

View File

@ -731,10 +731,9 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica
zap.Any("vChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
}
_ = r.node.tSafeReplica.removeTSafe(channel)
// queryCollection and Collection would be deleted in releaseCollection,
// so we don't need to remove the tSafeWatcher or channel manually.
}
} else {
r.node.dataSyncService.removeCollectionDeltaFlowGraph(r.req.CollectionID)
@ -745,10 +744,9 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica
zap.Any("vDeltaChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
}
_ = r.node.tSafeReplica.removeTSafe(channel)
// queryCollection and Collection would be deleted in releaseCollection,
// so we don't need to remove the tSafeWatcher or channel manually.
}
}
@ -821,9 +819,20 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
zap.Any("vChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
isRemoved := r.node.tSafeReplica.removeTSafe(channel)
if isRemoved {
// no tSafe or tSafe has been removed,
// we need to remove the corresponding tSafeWatcher in queryCollection,
// and remove the corresponding channel in collection
qc, err := r.node.queryService.getQueryCollection(r.req.CollectionID)
if err != nil {
return err
}
err = qc.removeTSafeWatcher(channel)
if err != nil {
return err
}
sCol.removeVChannel(channel)
}
}
}
@ -864,9 +873,20 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
zap.Any("vChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
isRemoved := r.node.tSafeReplica.removeTSafe(channel)
if isRemoved {
// no tSafe or tSafe has been removed,
// we need to remove the corresponding tSafeWatcher in queryCollection,
// and remove the corresponding channel in collection
qc, err := r.node.queryService.getQueryCollection(r.req.CollectionID)
if err != nil {
return err
}
err = qc.removeTSafeWatcher(channel)
if err != nil {
return err
}
sCol.removeVDeltaChannel(channel)
}
}
}

View File

@ -390,6 +390,9 @@ func TestTask_releaseCollectionTask(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = node.queryService.addQueryCollection(defaultCollectionID)
assert.NoError(t, err)
task := releaseCollectionTask{
req: genReleaseCollectionRequest(),
node: node,
@ -414,6 +417,25 @@ func TestTask_releaseCollectionTask(t *testing.T) {
err = task.Execute(ctx)
assert.Error(t, err)
})
t.Run("test execute remove deltaVChannel tSafe", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = node.queryService.addQueryCollection(defaultCollectionID)
assert.NoError(t, err)
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.addVDeltaChannels([]Channel{defaultHistoricalVChannel})
task := releaseCollectionTask{
req: genReleaseCollectionRequest(),
node: node,
}
err = task.Execute(ctx)
assert.NoError(t, err)
})
}
func TestTask_releasePartitionTask(t *testing.T) {
@ -457,6 +479,9 @@ func TestTask_releasePartitionTask(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = node.queryService.addQueryCollection(defaultCollectionID)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
@ -486,4 +511,30 @@ func TestTask_releasePartitionTask(t *testing.T) {
assert.Error(t, err)
})
t.Run("test execute, remove deltaVChannel", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
err = node.historical.replica.removePartition(defaultPartitionID)
assert.NoError(t, err)
col.addVDeltaChannels([]Channel{defaultHistoricalVChannel})
col.setLoadType(loadTypePartition)
err = node.queryService.addQueryCollection(defaultCollectionID)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
task.node.dataSyncService.addPartitionFlowGraph(defaultCollectionID,
defaultPartitionID,
[]Channel{defaultVChannel})
err = task.Execute(ctx)
assert.NoError(t, err)
})
}

View File

@ -24,11 +24,13 @@ import (
type tSafeWatcher struct {
notifyChan chan bool
closeCh chan struct{}
}
func newTSafeWatcher() *tSafeWatcher {
return &tSafeWatcher{
notifyChan: make(chan bool, 1),
closeCh: make(chan struct{}, 1),
}
}
@ -42,6 +44,10 @@ func (watcher *tSafeWatcher) watcherChan() <-chan bool {
return watcher.notifyChan
}
func (watcher *tSafeWatcher) close() {
watcher.closeCh <- struct{}{}
}
type tSafer interface {
get() Timestamp
set(id UniqueID, t Timestamp)

View File

@ -26,7 +26,7 @@ type TSafeReplicaInterface interface {
getTSafe(vChannel Channel) (Timestamp, error)
setTSafe(vChannel Channel, id UniqueID, timestamp Timestamp) error
addTSafe(vChannel Channel)
removeTSafe(vChannel Channel) error
removeTSafe(vChannel Channel) bool
registerTSafeWatcher(vChannel Channel, watcher *tSafeWatcher) error
removeRecord(vChannel Channel, partitionID UniqueID) error
}
@ -100,12 +100,13 @@ func (t *tSafeReplica) addTSafe(vChannel Channel) {
}
}
func (t *tSafeReplica) removeTSafe(vChannel Channel) error {
func (t *tSafeReplica) removeTSafe(vChannel Channel) bool {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.tSafes[vChannel]; !ok {
return errors.New("tSafe not exist, vChannel = " + vChannel)
return false
}
isRemoved := false
t.tSafes[vChannel].ref--
log.Debug("reduce tSafe reference count",
zap.Any("vChannel", vChannel),
@ -114,15 +115,17 @@ func (t *tSafeReplica) removeTSafe(vChannel Channel) error {
if t.tSafes[vChannel].ref == 0 {
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
return err
log.Warn(err.Error())
return false
}
log.Debug("remove tSafe replica",
zap.Any("vChannel", vChannel),
)
safer.close()
delete(t.tSafes, vChannel)
isRemoved = true
}
return nil
return isRemoved
}
func (t *tSafeReplica) removeRecord(vChannel Channel, partitionID UniqueID) error {

View File

@ -34,8 +34,8 @@ func TestTSafeReplica_valid(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, timestamp, resT)
err = replica.removeTSafe(defaultVChannel)
assert.NoError(t, err)
isRemoved := replica.removeTSafe(defaultVChannel)
assert.True(t, isRemoved)
}
func TestTSafeReplica_invalid(t *testing.T) {
@ -54,8 +54,8 @@ func TestTSafeReplica_invalid(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, timestamp, resT)
err = replica.removeTSafe(defaultVChannel)
assert.NoError(t, err)
isRemoved := replica.removeTSafe(defaultVChannel)
assert.True(t, isRemoved)
replica.addTSafe(defaultVChannel)
replica.addTSafe(defaultVChannel)

View File

@ -23,7 +23,9 @@ func TestTSafe_GetAndSet(t *testing.T) {
tSafe := newTSafe(context.Background(), "TestTSafe-channel")
tSafe.start()
watcher := newTSafeWatcher()
tSafe.registerTSafeWatcher(watcher)
defer watcher.close()
err := tSafe.registerTSafeWatcher(watcher)
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
@ -42,7 +44,9 @@ func TestTSafe_Remove(t *testing.T) {
tSafe := newTSafe(context.Background(), "TestTSafe-remove")
tSafe.start()
watcher := newTSafeWatcher()
tSafe.registerTSafeWatcher(watcher)
defer watcher.close()
err := tSafe.registerTSafeWatcher(watcher)
assert.NoError(t, err)
tSafe.set(UniqueID(1), Timestamp(1000))
tSafe.set(UniqueID(2), Timestamp(1001))
@ -60,7 +64,9 @@ func TestTSafe_Close(t *testing.T) {
tSafe := newTSafe(context.Background(), "TestTSafe-close")
tSafe.start()
watcher := newTSafeWatcher()
tSafe.registerTSafeWatcher(watcher)
defer watcher.close()
err := tSafe.registerTSafeWatcher(watcher)
assert.NoError(t, err)
// test set won't panic while close
go func() {
@ -79,6 +85,6 @@ func TestTSafe_Close(t *testing.T) {
tSafe.set(UniqueID(101), Timestamp(1000))
tSafe.removeRecord(UniqueID(1))
// register TSafe will fail
err := tSafe.registerTSafeWatcher(watcher)
err = tSafe.registerTSafeWatcher(watcher)
assert.Error(t, err)
}