mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 04:49:08 +08:00
Make tsafe close not elegant (#8582)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
d38adda8c2
commit
8465d03dec
@ -34,7 +34,6 @@ func newTSafeWatcher() *tSafeWatcher {
|
||||
func (watcher *tSafeWatcher) notify() {
|
||||
if len(watcher.notifyChan) == 0 {
|
||||
watcher.notifyChan <- true
|
||||
//log.Debug("tSafe watcher notify done")
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,8 +69,7 @@ type tSafe struct {
|
||||
watcherList []*tSafeWatcher
|
||||
tSafeChan chan tSafeMsg
|
||||
tSafeRecord map[UniqueID]Timestamp
|
||||
// waitgroup for closing control
|
||||
closeWg sync.WaitGroup
|
||||
isClose bool
|
||||
}
|
||||
|
||||
func newTSafe(ctx context.Context, channel Channel) tSafer {
|
||||
@ -90,17 +88,26 @@ func newTSafe(ctx context.Context, channel Channel) tSafer {
|
||||
}
|
||||
|
||||
func (ts *tSafe) start() {
|
||||
ts.closeWg.Add(1)
|
||||
go func() {
|
||||
defer ts.closeWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ts.ctx.Done():
|
||||
ts.tSafeMu.Lock()
|
||||
ts.isClose = true
|
||||
log.Debug("tSafe context done",
|
||||
zap.Any("channel", ts.channel),
|
||||
)
|
||||
for _, watcher := range ts.watcherList {
|
||||
close(watcher.notifyChan)
|
||||
}
|
||||
close(ts.tSafeChan)
|
||||
ts.tSafeMu.Unlock()
|
||||
return
|
||||
case m := <-ts.tSafeChan:
|
||||
case m, ok := <-ts.tSafeChan:
|
||||
if !ok {
|
||||
// should not happen!!
|
||||
return
|
||||
}
|
||||
ts.tSafeMu.Lock()
|
||||
ts.tSafeRecord[m.id] = m.t
|
||||
var tmpT Timestamp = math.MaxUint64
|
||||
@ -155,7 +162,13 @@ func (ts *tSafe) get() Timestamp {
|
||||
func (ts *tSafe) set(id UniqueID, t Timestamp) {
|
||||
ts.tSafeMu.Lock()
|
||||
defer ts.tSafeMu.Unlock()
|
||||
|
||||
if ts.isClose {
|
||||
// should not happen if tsafe_replica guard correctly
|
||||
log.Warn("Try to set id with ts close ",
|
||||
zap.Any("channel", ts.channel),
|
||||
zap.Any("it", id))
|
||||
return
|
||||
}
|
||||
msg := tSafeMsg{
|
||||
t: t,
|
||||
id: id,
|
||||
@ -165,13 +178,4 @@ func (ts *tSafe) set(id UniqueID, t Timestamp) {
|
||||
|
||||
func (ts *tSafe) close() {
|
||||
ts.cancel()
|
||||
// wait for all job done
|
||||
ts.closeWg.Wait()
|
||||
|
||||
ts.tSafeMu.Lock()
|
||||
defer ts.tSafeMu.Unlock()
|
||||
|
||||
for _, watcher := range ts.watcherList {
|
||||
close(watcher.notifyChan)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user