Use typeutil.ConcurrentMap instead of sync.Map (#25846)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-07-24 10:23:01 +08:00 committed by GitHub
parent 31b40b3ccc
commit 3c503afe7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 191 additions and 228 deletions

View File

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"path" "path"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -31,15 +30,17 @@ import (
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
type channelStateTimer struct { type channelStateTimer struct {
watchkv kv.WatchKV watchkv kv.WatchKV
runningTimers sync.Map runningTimers *typeutil.ConcurrentMap[string, *time.Timer]
runningTimerStops sync.Map // channel name to timer stop channels runningTimerStops *typeutil.ConcurrentMap[string, chan struct{}] // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
//Modifies afterwards must guarantee that runningTimerCount is updated synchronized with runningTimers //Modifies afterwards must guarantee that runningTimerCount is updated synchronized with runningTimers
//in order to keep consistency //in order to keep consistency
runningTimerCount atomic.Int32 runningTimerCount atomic.Int32
@ -47,8 +48,10 @@ type channelStateTimer struct {
func newChannelStateTimer(kv kv.WatchKV) *channelStateTimer { func newChannelStateTimer(kv kv.WatchKV) *channelStateTimer {
return &channelStateTimer{ return &channelStateTimer{
watchkv: kv, watchkv: kv,
timeoutWatcher: make(chan *ackEvent, 20), timeoutWatcher: make(chan *ackEvent, 20),
runningTimers: typeutil.NewConcurrentMap[string, *time.Timer](),
runningTimerStops: typeutil.NewConcurrentMap[string, chan struct{}](),
} }
} }
@ -103,8 +106,8 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
stop := make(chan struct{}) stop := make(chan struct{})
ticker := time.NewTimer(timeout) ticker := time.NewTimer(timeout)
c.removeTimers([]string{channelName}) c.removeTimers([]string{channelName})
c.runningTimerStops.Store(channelName, stop) c.runningTimerStops.Insert(channelName, stop)
c.runningTimers.Store(channelName, ticker) c.runningTimers.Insert(channelName, ticker)
c.runningTimerCount.Inc() c.runningTimerCount.Inc()
go func() { go func() {
log.Info("timer started", log.Info("timer started",
@ -145,9 +148,9 @@ func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) {
func (c *channelStateTimer) removeTimers(channels []string) { func (c *channelStateTimer) removeTimers(channels []string) {
for _, channel := range channels { for _, channel := range channels {
if stop, ok := c.runningTimerStops.LoadAndDelete(channel); ok { if stop, ok := c.runningTimerStops.GetAndRemove(channel); ok {
close(stop.(chan struct{})) close(stop)
c.runningTimers.Delete(channel) c.runningTimers.GetAndRemove(channel)
c.runningTimerCount.Dec() c.runningTimerCount.Dec()
log.Info("remove timer for channel", zap.String("channel", channel), log.Info("remove timer for channel", zap.String("channel", channel),
zap.Int32("timerCount", c.runningTimerCount.Load())) zap.Int32("timerCount", c.runningTimerCount.Load()))
@ -156,10 +159,10 @@ func (c *channelStateTimer) removeTimers(channels []string) {
} }
func (c *channelStateTimer) stopIfExist(e *ackEvent) { func (c *channelStateTimer) stopIfExist(e *ackEvent) {
stop, ok := c.runningTimerStops.LoadAndDelete(e.channelName) stop, ok := c.runningTimerStops.GetAndRemove(e.channelName)
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck { if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{})) close(stop)
c.runningTimers.Delete(e.channelName) c.runningTimers.GetAndRemove(e.channelName)
c.runningTimerCount.Dec() c.runningTimerCount.Dec()
log.Info("stop timer for channel", zap.String("channel", e.channelName), log.Info("stop timer for channel", zap.String("channel", e.channelName),
zap.Int32("timerCount", c.runningTimerCount.Load())) zap.Int32("timerCount", c.runningTimerCount.Load()))
@ -167,8 +170,7 @@ func (c *channelStateTimer) stopIfExist(e *ackEvent) {
} }
func (c *channelStateTimer) resetIfExist(channel string, interval time.Duration) { func (c *channelStateTimer) resetIfExist(channel string, interval time.Duration) {
if value, ok := c.runningTimers.Load(channel); ok { if timer, ok := c.runningTimers.Get(channel); ok {
timer := value.(*time.Timer)
timer.Reset(interval) timer.Reset(interval)
} }
} }

View File

@ -132,18 +132,18 @@ func TestChannelStateTimer(t *testing.T) {
timer := newChannelStateTimer(kv) timer := newChannelStateTimer(kv)
timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, 20*time.Second) timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, 20*time.Second)
stop, ok := timer.runningTimerStops.Load("channel-1") stop, ok := timer.runningTimerStops.Get("channel-1")
require.True(t, ok) require.True(t, ok)
timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, 20*time.Second) timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, 20*time.Second)
_, ok = <-stop.(chan struct{}) _, ok = <-stop
assert.False(t, ok) assert.False(t, ok)
stop2, ok := timer.runningTimerStops.Load("channel-1") stop2, ok := timer.runningTimerStops.Get("channel-1")
assert.True(t, ok) assert.True(t, ok)
timer.removeTimers([]string{"channel-1"}) timer.removeTimers([]string{"channel-1"})
_, ok = <-stop2.(chan struct{}) _, ok = <-stop2
assert.False(t, ok) assert.False(t, ok)
}) })
} }

View File

@ -127,7 +127,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_WatchSuccess, nodeID, cName, collectionID) waitAndCheckState(t, watchkv, datapb.ChannelWatchState_WatchSuccess, nodeID, cName, collectionID)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
return !loaded return !loaded
}, waitFor, tick) }, waitFor, tick)
@ -157,7 +157,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToRelease, nodeID, cName, collectionID) waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToRelease, nodeID, cName, collectionID)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
return loaded return loaded
}, waitFor, tick) }, waitFor, tick)
@ -193,7 +193,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToRelease, nodeID, cName, collectionID) waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToRelease, nodeID, cName, collectionID)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
return loaded return loaded
}, waitFor, tick) }, waitFor, tick)
@ -242,7 +242,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.Empty(t, w) assert.Empty(t, w)
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
assert.True(t, loaded) assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{cName}) chManager.stateTimer.removeTimers([]string{cName})
}) })
@ -279,7 +279,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, cName, collectionID) waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, cName, collectionID)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
return loaded return loaded
}, waitFor, tick) }, waitFor, tick)
cancel() cancel()
@ -331,7 +331,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.Empty(t, w) assert.Empty(t, w)
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
assert.True(t, loaded) assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{cName}) chManager.stateTimer.removeTimers([]string{cName})
}) })
@ -370,7 +370,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, cName, collectionID) waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, cName, collectionID)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimerStops.Load(cName) loaded := chManager.stateTimer.runningTimerStops.Contain(cName)
return loaded return loaded
}, waitFor, tick) }, waitFor, tick)
@ -811,7 +811,7 @@ func TestChannelManager_Reload(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
chManager.checkOldNodes([]UniqueID{nodeID}) chManager.checkOldNodes([]UniqueID{nodeID})
_, ok := chManager.stateTimer.runningTimerStops.Load(channelName) ok := chManager.stateTimer.runningTimerStops.Contain(channelName)
assert.True(t, ok) assert.True(t, ok)
chManager.stateTimer.removeTimers([]string{channelName}) chManager.stateTimer.removeTimers([]string{channelName})
}) })
@ -827,7 +827,7 @@ func TestChannelManager_Reload(t *testing.T) {
err = chManager.checkOldNodes([]UniqueID{nodeID}) err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err) assert.NoError(t, err)
_, ok := chManager.stateTimer.runningTimerStops.Load(channelName) ok := chManager.stateTimer.runningTimerStops.Contain(channelName)
assert.True(t, ok) assert.True(t, ok)
chManager.stateTimer.removeTimers([]string{channelName}) chManager.stateTimer.removeTimers([]string{channelName})
}) })

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -251,7 +252,7 @@ func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateR
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
ctx := context.Background() ctx := context.Background()
plans := sync.Map{} plans := typeutil.NewConcurrentMap[int64, *datapb.CompactionStateResult]()
c.sessions.RLock() c.sessions.RLock()
for nodeID, s := range c.sessions.data { for nodeID, s := range c.sessions.data {
wg.Add(1) wg.Add(1)
@ -280,7 +281,7 @@ func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateR
return return
} }
for _, rst := range resp.GetResults() { for _, rst := range resp.GetResults() {
plans.Store(rst.PlanID, rst) plans.Insert(rst.PlanID, rst)
} }
}(nodeID, s) }(nodeID, s)
} }
@ -288,8 +289,8 @@ func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateR
wg.Wait() wg.Wait()
rst := make(map[int64]*datapb.CompactionStateResult) rst := make(map[int64]*datapb.CompactionStateResult)
plans.Range(func(key, value any) bool { plans.Range(func(planID int64, result *datapb.CompactionStateResult) bool {
rst[key.(int64)] = value.(*datapb.CompactionStateResult) rst[planID] = result
return true return true
}) })

View File

@ -16,9 +16,7 @@
package datanode package datanode
import ( import "github.com/milvus-io/milvus/pkg/util/typeutil"
"sync"
)
// Cache stores flushing segments' ids to prevent flushing the same segment again and again. // Cache stores flushing segments' ids to prevent flushing the same segment again and again.
// //
@ -28,37 +26,33 @@ import (
// After the flush procedure, whether the segment successfully flushed or not, // After the flush procedure, whether the segment successfully flushed or not,
// it'll be removed from the cache. So if flush failed, the secondary flush can be triggered. // it'll be removed from the cache. So if flush failed, the secondary flush can be triggered.
type Cache struct { type Cache struct {
cacheMap sync.Map *typeutil.ConcurrentSet[UniqueID]
} }
// newCache returns a new Cache // newCache returns a new Cache
func newCache() *Cache { func newCache() *Cache {
return &Cache{ return &Cache{
cacheMap: sync.Map{}, ConcurrentSet: typeutil.NewConcurrentSet[UniqueID](),
} }
} }
// checkIfCached returns whether unique id is in cache // checkIfCached returns whether unique id is in cache
func (c *Cache) checkIfCached(key UniqueID) bool { func (c *Cache) checkIfCached(key UniqueID) bool {
_, ok := c.cacheMap.Load(key) return c.Contain(key)
return ok
} }
// Cache caches a specific ID into the cache // Cache caches a specific ID into the cache
func (c *Cache) Cache(ID UniqueID) { func (c *Cache) Cache(ID UniqueID) {
c.cacheMap.Store(ID, struct{}{}) c.Insert(ID)
} }
// checkOrCache returns true if `key` is present. // checkOrCache returns true if `key` is present.
// Otherwise, it returns false and stores `key` into cache. // Otherwise, it returns false and stores `key` into cache.
func (c *Cache) checkOrCache(key UniqueID) bool { func (c *Cache) checkOrCache(key UniqueID) bool {
_, exist := c.cacheMap.LoadOrStore(key, struct{}{}) return !c.Insert(key)
return exist
} }
// Remove removes a set of IDs from the cache // Remove removes a set of IDs from the cache
func (c *Cache) Remove(IDs ...UniqueID) { func (c *Cache) Remove(IDs ...UniqueID) {
for _, id := range IDs { c.ConcurrentSet.Remove(IDs...)
c.cacheMap.Delete(id)
}
} }

View File

@ -18,12 +18,12 @@ package datanode
import ( import (
"context" "context"
"sync"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
const ( const (
@ -31,17 +31,20 @@ const (
) )
type compactionExecutor struct { type compactionExecutor struct {
executing sync.Map // planID to compactor executing *typeutil.ConcurrentMap[int64, compactor] // planID to compactor
completedCompactor sync.Map // planID to compactor completedCompactor *typeutil.ConcurrentMap[int64, compactor] // planID to compactor
completed sync.Map // planID to CompactionResult completed *typeutil.ConcurrentMap[int64, *datapb.CompactionResult] // planID to CompactionResult
taskCh chan compactor taskCh chan compactor
dropped sync.Map // vchannel dropped dropped *typeutil.ConcurrentSet[string] // vchannel dropped
} }
func newCompactionExecutor() *compactionExecutor { func newCompactionExecutor() *compactionExecutor {
return &compactionExecutor{ return &compactionExecutor{
executing: sync.Map{}, executing: typeutil.NewConcurrentMap[int64, compactor](),
taskCh: make(chan compactor, maxTaskNum), completedCompactor: typeutil.NewConcurrentMap[int64, compactor](),
completed: typeutil.NewConcurrentMap[int64, *datapb.CompactionResult](),
taskCh: make(chan compactor, maxTaskNum),
dropped: typeutil.NewConcurrentSet[string](),
} }
} }
@ -51,19 +54,19 @@ func (c *compactionExecutor) execute(task compactor) {
} }
func (c *compactionExecutor) toExecutingState(task compactor) { func (c *compactionExecutor) toExecutingState(task compactor) {
c.executing.Store(task.getPlanID(), task) c.executing.Insert(task.getPlanID(), task)
} }
func (c *compactionExecutor) toCompleteState(task compactor) { func (c *compactionExecutor) toCompleteState(task compactor) {
task.complete() task.complete()
c.executing.Delete(task.getPlanID()) c.executing.GetAndRemove(task.getPlanID())
} }
func (c *compactionExecutor) injectDone(planID UniqueID, success bool) { func (c *compactionExecutor) injectDone(planID UniqueID, success bool) {
c.completed.Delete(planID) c.completed.GetAndRemove(planID)
task, loaded := c.completedCompactor.LoadAndDelete(planID) task, loaded := c.completedCompactor.GetAndRemove(planID)
if loaded { if loaded {
task.(compactor).injectDone(success) task.injectDone(success)
} }
} }
@ -97,42 +100,41 @@ func (c *compactionExecutor) executeTask(task compactor) {
zap.Error(err), zap.Error(err),
) )
} else { } else {
c.completed.Store(task.getPlanID(), result) c.completed.Insert(task.getPlanID(), result)
c.completedCompactor.Store(task.getPlanID(), task) c.completedCompactor.Insert(task.getPlanID(), task)
} }
log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID())) log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID()))
} }
func (c *compactionExecutor) stopTask(planID UniqueID) { func (c *compactionExecutor) stopTask(planID UniqueID) {
task, loaded := c.executing.LoadAndDelete(planID) task, loaded := c.executing.GetAndRemove(planID)
if loaded { if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.(compactor).getChannelName())) log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.getChannelName()))
task.(compactor).stop() task.stop()
} }
} }
func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) bool { func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) bool {
// if vchannel marked dropped, compaction should not proceed // if vchannel marked dropped, compaction should not proceed
_, loaded := c.dropped.Load(vChannelName) return !c.dropped.Contain(vChannelName)
return !loaded
} }
func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) { func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) {
c.dropped.Store(vChannelName, struct{}{}) c.dropped.Insert(vChannelName)
c.executing.Range(func(key interface{}, value interface{}) bool { c.executing.Range(func(planID int64, task compactor) bool {
if value.(compactor).getChannelName() == vChannelName { if task.getChannelName() == vChannelName {
c.stopTask(key.(UniqueID)) c.stopTask(planID)
} }
return true return true
}) })
// remove all completed plans for vChannelName // remove all completed plans for vChannelName
c.completed.Range(func(key interface{}, value interface{}) bool { c.completed.Range(func(planID int64, result *datapb.CompactionResult) bool {
if value.(*datapb.CompactionResult).GetChannel() == vChannelName { if result.GetChannel() == vChannelName {
c.injectDone(key.(UniqueID), true) c.injectDone(planID, true)
log.Info("remove compaction results for dropped channel", log.Info("remove compaction results for dropped channel",
zap.String("channel", vChannelName), zap.String("channel", vChannelName),
zap.Int64("planID", key.(UniqueID))) zap.Int64("planID", planID))
} }
return true return true
}) })

View File

@ -103,7 +103,7 @@ func TestCompactionExecutor(t *testing.T) {
// wait for task enqueued // wait for task enqueued
found := false found := false
for !found { for !found {
_, found = ex.executing.Load(mc.getPlanID()) found = ex.executing.Contain(mc.getPlanID())
} }
ex.stopExecutingtaskByVChannelName("mock") ex.stopExecutingtaskByVChannelName("mock")

View File

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"math" "math"
"reflect" "reflect"
"sync"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -54,7 +53,6 @@ type insertBufferNode struct {
channel Channel channel Channel
idAllocator allocator.Allocator idAllocator allocator.Allocator
flushMap sync.Map
flushChan <-chan flushMsg flushChan <-chan flushMsg
resendTTChan <-chan resendTTMsg resendTTChan <-chan resendTTMsg
flushingSegCache *Cache flushingSegCache *Cache
@ -700,7 +698,6 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *De
ctx: ctx, ctx: ctx,
BaseNode: baseNode, BaseNode: baseNode,
flushMap: sync.Map{},
flushChan: flushCh, flushChan: flushCh,
resendTTChan: resendTTCh, resendTTChan: resendTTCh,
flushingSegCache: flushingSegCache, flushingSegCache: flushingSegCache,
@ -767,7 +764,6 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *De
BaseNode: baseNode, BaseNode: baseNode,
timeTickStream: wTtMsgStream, timeTickStream: wTtMsgStream,
flushMap: sync.Map{},
flushChan: flushCh, flushChan: flushCh,
resendTTChan: resendTTCh, resendTTChan: resendTTCh,
flushingSegCache: flushingSegCache, flushingSegCache: flushingSegCache,

View File

@ -43,6 +43,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// flushManager defines a flush manager signature // flushManager defines a flush manager signature
@ -97,7 +98,7 @@ type orderFlushQueue struct {
injectCh chan *taskInjection injectCh chan *taskInjection
// MsgID => flushTask // MsgID => flushTask
working sync.Map working *typeutil.ConcurrentMap[string, *flushTaskRunner]
notifyFunc notifyMetaFunc notifyFunc notifyMetaFunc
tailMut sync.Mutex tailMut sync.Mutex
@ -114,6 +115,7 @@ func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue {
segmentID: segID, segmentID: segID,
notifyFunc: f, notifyFunc: f,
injectCh: make(chan *taskInjection, 100), injectCh: make(chan *taskInjection, 100),
working: typeutil.NewConcurrentMap[string, *flushTaskRunner](),
} }
return q return q
} }
@ -128,8 +130,7 @@ func (q *orderFlushQueue) init() {
} }
func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskRunner { func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskRunner {
actual, loaded := q.working.LoadOrStore(getSyncTaskID(pos), newFlushTaskRunner(q.segmentID, q.injectCh)) t, loaded := q.working.GetOrInsert(getSyncTaskID(pos), newFlushTaskRunner(q.segmentID, q.injectCh))
t := actual.(*flushTaskRunner)
// not loaded means the task runner is new, do initializtion // not loaded means the task runner is new, do initializtion
if !loaded { if !loaded {
// take over injection if task queue is handling it // take over injection if task queue is handling it
@ -152,7 +153,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskR
// postTask handles clean up work after a task is done // postTask handles clean up work after a task is done
func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) { func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) {
// delete task from working map // delete task from working map
q.working.Delete(getSyncTaskID(pack.pos)) q.working.GetAndRemove(getSyncTaskID(pack.pos))
// after descreasing working count, check whether flush queue is empty // after descreasing working count, check whether flush queue is empty
q.injectMut.Lock() q.injectMut.Lock()
q.runningTasks-- q.runningTasks--
@ -271,7 +272,7 @@ type rendezvousFlushManager struct {
Channel Channel
// segment id => flush queue // segment id => flush queue
dispatcher sync.Map dispatcher *typeutil.ConcurrentMap[int64, *orderFlushQueue]
notifyFunc notifyMetaFunc notifyFunc notifyMetaFunc
dropping atomic.Bool dropping atomic.Bool
@ -281,9 +282,7 @@ type rendezvousFlushManager struct {
// getFlushQueue gets or creates an orderFlushQueue for segment id if not found // getFlushQueue gets or creates an orderFlushQueue for segment id if not found
func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue { func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue {
newQueue := newOrderFlushQueue(segmentID, m.notifyFunc) newQueue := newOrderFlushQueue(segmentID, m.notifyFunc)
actual, _ := m.dispatcher.LoadOrStore(segmentID, newQueue) queue, _ := m.dispatcher.GetOrInsert(segmentID, newQueue)
// all operation on dispatcher is private, assertion ok guaranteed
queue := actual.(*orderFlushQueue)
queue.init() queue.init()
return queue return queue
} }
@ -321,7 +320,7 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
if m.dropping.Load() { if m.dropping.Load() {
// preventing separate delete, check position exists in queue first // preventing separate delete, check position exists in queue first
q := m.getFlushQueue(segmentID) q := m.getFlushQueue(segmentID)
_, ok := q.working.Load(getSyncTaskID(pos)) _, ok := q.working.Get(getSyncTaskID(pos))
// if ok, means position insert data already in queue, just handle task in normal mode // if ok, means position insert data already in queue, just handle task in normal mode
// if not ok, means the insert buf should be handle in drop mode // if not ok, means the insert buf should be handle in drop mode
if !ok { if !ok {
@ -422,12 +421,8 @@ func (m *rendezvousFlushManager) serializePkStatsLog(segmentID int64, flushed bo
// isFull return true if the task pool is full // isFull return true if the task pool is full
func (m *rendezvousFlushManager) isFull() bool { func (m *rendezvousFlushManager) isFull() bool {
var num int var num int
m.dispatcher.Range(func(_, q any) bool { m.dispatcher.Range(func(_ int64, queue *orderFlushQueue) bool {
queue := q.(*orderFlushQueue) num += queue.working.Len()
queue.working.Range(func(_, _ any) bool {
num++
return true
})
return true return true
}) })
return num >= Params.DataNodeCfg.MaxParallelSyncTaskNum.GetAsInt() return num >= Params.DataNodeCfg.MaxParallelSyncTaskNum.GetAsInt()
@ -605,8 +600,7 @@ func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *msgpb.M
// waitForAllTaskQueue waits for all flush queues in dispatcher become empty // waitForAllTaskQueue waits for all flush queues in dispatcher become empty
func (m *rendezvousFlushManager) waitForAllFlushQueue() { func (m *rendezvousFlushManager) waitForAllFlushQueue() {
var wg sync.WaitGroup var wg sync.WaitGroup
m.dispatcher.Range(func(k, v interface{}) bool { m.dispatcher.Range(func(segmentID int64, queue *orderFlushQueue) bool {
queue := v.(*orderFlushQueue)
wg.Add(1) wg.Add(1)
go func() { go func() {
<-queue.tailCh <-queue.tailCh
@ -652,9 +646,8 @@ func getSyncTaskID(pos *msgpb.MsgPosition) string {
// close cleans up all the left members // close cleans up all the left members
func (m *rendezvousFlushManager) close() { func (m *rendezvousFlushManager) close() {
m.dispatcher.Range(func(k, v interface{}) bool { m.dispatcher.Range(func(segmentID int64, queue *orderFlushQueue) bool {
//assertion ok //assertion ok
queue := v.(*orderFlushQueue)
queue.injectMut.Lock() queue.injectMut.Lock()
for i := 0; i < len(queue.injectCh); i++ { for i := 0; i < len(queue.injectCh); i++ {
go queue.handleInject(<-queue.injectCh) go queue.handleInject(<-queue.injectCh)
@ -721,6 +714,7 @@ func NewRendezvousFlushManager(allocator allocator.Allocator, cm storage.ChunkMa
dropHandler: dropHandler{ dropHandler: dropHandler{
flushAndDrop: drop, flushAndDrop: drop,
}, },
dispatcher: typeutil.NewConcurrentMap[int64, *orderFlushQueue](),
} }
// start with normal mode // start with normal mode
fm.dropping.Store(false) fm.dropping.Store(false)

View File

@ -309,18 +309,18 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
}, nil }, nil
} }
results := make([]*datapb.CompactionStateResult, 0) results := make([]*datapb.CompactionStateResult, 0)
node.compactionExecutor.executing.Range(func(k, v any) bool { node.compactionExecutor.executing.Range(func(planID int64, task compactor) bool {
results = append(results, &datapb.CompactionStateResult{ results = append(results, &datapb.CompactionStateResult{
State: commonpb.CompactionState_Executing, State: commonpb.CompactionState_Executing,
PlanID: k.(UniqueID), PlanID: planID,
}) })
return true return true
}) })
node.compactionExecutor.completed.Range(func(k, v any) bool { node.compactionExecutor.completed.Range(func(planID int64, result *datapb.CompactionResult) bool {
results = append(results, &datapb.CompactionStateResult{ results = append(results, &datapb.CompactionStateResult{
State: commonpb.CompactionState_Completed, State: commonpb.CompactionState_Completed,
PlanID: k.(UniqueID), PlanID: planID,
Result: v.(*datapb.CompactionResult), Result: result,
}) })
return true return true
}) })

View File

@ -146,9 +146,9 @@ func (s *DataNodeServicesSuite) TestGetComponentStates() {
func (s *DataNodeServicesSuite) TestGetCompactionState() { func (s *DataNodeServicesSuite) TestGetCompactionState() {
s.Run("success", func() { s.Run("success", func() {
s.node.compactionExecutor.executing.Store(int64(3), 0) s.node.compactionExecutor.executing.Insert(int64(3), newMockCompactor(true))
s.node.compactionExecutor.executing.Store(int64(2), 0) s.node.compactionExecutor.executing.Insert(int64(2), newMockCompactor(true))
s.node.compactionExecutor.completed.Store(int64(1), &datapb.CompactionResult{ s.node.compactionExecutor.completed.Insert(int64(1), &datapb.CompactionResult{
PlanID: 1, PlanID: 1,
SegmentID: 10, SegmentID: 10,
}) })
@ -169,16 +169,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
s.Assert().Equal(1, cnt) s.Assert().Equal(1, cnt)
mu.Unlock() mu.Unlock()
mu.Lock() s.Assert().Equal(1, s.node.compactionExecutor.completed.Len())
cnt = 0
mu.Unlock()
s.node.compactionExecutor.completed.Range(func(k, v any) bool {
mu.Lock()
cnt++
mu.Unlock()
return true
})
s.Assert().Equal(1, cnt)
}) })
s.Run("unhealthy", func() { s.Run("unhealthy", func() {

View File

@ -3,25 +3,31 @@ package indexnode
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
type StorageFactory interface { type StorageFactory interface {
NewChunkManager(ctx context.Context, config *indexpb.StorageConfig) (storage.ChunkManager, error) NewChunkManager(ctx context.Context, config *indexpb.StorageConfig) (storage.ChunkManager, error)
} }
type chunkMgr struct { type chunkMgrFactory struct {
cached sync.Map cached *typeutil.ConcurrentMap[string, storage.ChunkManager]
} }
func (m *chunkMgr) NewChunkManager(ctx context.Context, config *indexpb.StorageConfig) (storage.ChunkManager, error) { func NewChunkMgrFactory() *chunkMgrFactory {
return &chunkMgrFactory{
cached: typeutil.NewConcurrentMap[string, storage.ChunkManager](),
}
}
func (m *chunkMgrFactory) NewChunkManager(ctx context.Context, config *indexpb.StorageConfig) (storage.ChunkManager, error) {
key := m.cacheKey(config.StorageType, config.BucketName, config.Address) key := m.cacheKey(config.StorageType, config.BucketName, config.Address)
if v, ok := m.cached.Load(key); ok { if v, ok := m.cached.Get(key); ok {
return v.(storage.ChunkManager), nil return v, nil
} }
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params) chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
@ -29,11 +35,11 @@ func (m *chunkMgr) NewChunkManager(ctx context.Context, config *indexpb.StorageC
if err != nil { if err != nil {
return nil, err return nil, err
} }
v, _ := m.cached.LoadOrStore(key, mgr) v, _ := m.cached.GetOrInsert(key, mgr)
log.Ctx(ctx).Info("index node successfully init chunk manager") log.Ctx(ctx).Info("index node successfully init chunk manager")
return v.(storage.ChunkManager), nil return v, nil
} }
func (m *chunkMgr) cacheKey(storageType, bucket, address string) string { func (m *chunkMgrFactory) cacheKey(storageType, bucket, address string) string {
return fmt.Sprintf("%s/%s/%s", storageType, bucket, address) return fmt.Sprintf("%s/%s/%s", storageType, bucket, address)
} }

View File

@ -111,7 +111,7 @@ func NewIndexNode(ctx context.Context, factory dependency.Factory) *IndexNode {
loopCtx: ctx1, loopCtx: ctx1,
loopCancel: cancel, loopCancel: cancel,
factory: factory, factory: factory,
storageFactory: &chunkMgr{}, storageFactory: NewChunkMgrFactory(),
tasks: map[taskKey]*taskInfo{}, tasks: map[taskKey]*taskInfo{},
lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal), lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal),
} }

View File

@ -417,7 +417,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
rmq.retentionInfo.mutex.Lock() rmq.retentionInfo.mutex.Lock()
defer rmq.retentionInfo.mutex.Unlock() defer rmq.retentionInfo.mutex.Unlock()
rmq.retentionInfo.topicRetetionTime.Store(topicName, time.Now().Unix()) rmq.retentionInfo.topicRetetionTime.Insert(topicName, time.Now().Unix())
log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil return nil
} }
@ -480,7 +480,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
// clean up retention info // clean up retention info
topicMu.Delete(topicName) topicMu.Delete(topicName)
rmq.retentionInfo.topicRetetionTime.Delete(topicName) rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName)
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil return nil

View File

@ -34,7 +34,7 @@ const (
type retentionInfo struct { type retentionInfo struct {
// key is topic name, value is last retention time // key is topic name, value is last retention time
topicRetetionTime sync.Map topicRetetionTime *typeutil.ConcurrentMap[string, int64]
mutex sync.RWMutex mutex sync.RWMutex
kv *rocksdbkv.RocksdbKV kv *rocksdbkv.RocksdbKV
@ -47,7 +47,7 @@ type retentionInfo struct {
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
ri := &retentionInfo{ ri := &retentionInfo{
topicRetetionTime: sync.Map{}, topicRetetionTime: typeutil.NewConcurrentMap[string, int64](),
mutex: sync.RWMutex{}, mutex: sync.RWMutex{},
kv: kv, kv: kv,
db: db, db: db,
@ -61,7 +61,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
} }
for _, key := range topicKeys { for _, key := range topicKeys {
topic := key[len(TopicIDTitle):] topic := key[len(TopicIDTitle):]
ri.topicRetetionTime.Store(topic, time.Now().Unix()) ri.topicRetetionTime.Insert(topic, time.Now().Unix())
topicMu.Store(topic, new(sync.Mutex)) topicMu.Store(topic, new(sync.Mutex))
} }
return ri, nil return ri, nil
@ -99,19 +99,13 @@ func (ri *retentionInfo) retention() error {
timeNow := t.Unix() timeNow := t.Unix()
checkTime := int64(params.RocksmqCfg.RetentionTimeInMinutes.GetAsFloat() * 60 / 10) checkTime := int64(params.RocksmqCfg.RetentionTimeInMinutes.GetAsFloat() * 60 / 10)
ri.mutex.RLock() ri.mutex.RLock()
ri.topicRetetionTime.Range(func(k, v interface{}) bool { ri.topicRetetionTime.Range(func(topic string, lastRetentionTs int64) bool {
topic, _ := k.(string)
lastRetentionTs, ok := v.(int64)
if !ok {
log.Warn("Can't parse lastRetention to int64", zap.String("topic", topic), zap.Any("value", v))
return true
}
if lastRetentionTs+checkTime < timeNow { if lastRetentionTs+checkTime < timeNow {
err := ri.expiredCleanUp(topic) err := ri.expiredCleanUp(topic)
if err != nil { if err != nil {
log.Warn("Retention expired clean failed", zap.Any("error", err)) log.Warn("Retention expired clean failed", zap.Error(err))
} }
ri.topicRetetionTime.Store(topic, timeNow) ri.topicRetetionTime.Insert(topic, timeNow)
} }
return true return true
}) })

View File

@ -18,7 +18,6 @@ package proxy
import ( import (
"context" "context"
"sync"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -247,7 +246,7 @@ func repackInsertDataWithPartitionKey(ctx context.Context,
} }
errGroup, _ := errgroup.WithContext(ctx) errGroup, _ := errgroup.WithContext(ctx)
partition2Msgs := sync.Map{} partition2Msgs := typeutil.NewConcurrentMap[string, []msgstream.TsMsg]()
for partitionName, offsets := range partition2RowOffsets { for partitionName, offsets := range partition2RowOffsets {
partitionName := partitionName partitionName := partitionName
offsets := offsets offsets := offsets
@ -257,7 +256,7 @@ func repackInsertDataWithPartitionKey(ctx context.Context,
return err return err
} }
partition2Msgs.Store(partitionName, msgs) partition2Msgs.Insert(partitionName, msgs)
return nil return nil
}) })
} }
@ -271,8 +270,7 @@ func repackInsertDataWithPartitionKey(ctx context.Context,
return nil, err return nil, err
} }
partition2Msgs.Range(func(k, v interface{}) bool { partition2Msgs.Range(func(name string, msgs []msgstream.TsMsg) bool {
msgs := v.([]msgstream.TsMsg)
msgPack.Msgs = append(msgPack.Msgs, msgs...) msgPack.Msgs = append(msgPack.Msgs, msgs...)
return true return true
}) })

View File

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
@ -48,7 +49,7 @@ type Executor struct {
// Merge load segment requests // Merge load segment requests
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]
executingTasks sync.Map executingTasks *typeutil.ConcurrentSet[int64] // taskID
executingTaskNum atomic.Int32 executingTaskNum atomic.Int32
} }
@ -68,7 +69,7 @@ func NewExecutor(meta *meta.Meta,
nodeMgr: nodeMgr, nodeMgr: nodeMgr,
merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](), merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](),
executingTasks: sync.Map{}, executingTasks: typeutil.NewConcurrentSet[int64](),
} }
} }
@ -86,12 +87,12 @@ func (ex *Executor) Stop() {
// does nothing and returns false if the action is already committed, // does nothing and returns false if the action is already committed,
// returns true otherwise. // returns true otherwise.
func (ex *Executor) Execute(task Task, step int) bool { func (ex *Executor) Execute(task Task, step int) bool {
_, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{}) exist := !ex.executingTasks.Insert(task.ID())
if exist { if exist {
return false return false
} }
if ex.executingTaskNum.Inc() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() { if ex.executingTaskNum.Inc() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() {
ex.executingTasks.Delete(task.ID()) ex.executingTasks.Remove(task.ID())
ex.executingTaskNum.Dec() ex.executingTaskNum.Dec()
return false return false
} }
@ -119,8 +120,7 @@ func (ex *Executor) Execute(task Task, step int) bool {
} }
func (ex *Executor) Exist(taskID int64) bool { func (ex *Executor) Exist(taskID int64) bool {
_, ok := ex.executingTasks.Load(taskID) return ex.executingTasks.Contain(taskID)
return ok
} }
func (ex *Executor) scheduleRequests() { func (ex *Executor) scheduleRequests() {
@ -207,7 +207,7 @@ func (ex *Executor) removeTask(task Task, step int) {
zap.Error(task.Err())) zap.Error(task.Err()))
} }
ex.executingTasks.Delete(task.ID()) ex.executingTasks.Remove(task.ID())
ex.executingTaskNum.Dec() ex.executingTaskNum.Dec()
} }

View File

@ -1191,8 +1191,8 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
keys = make([]any, 0) keys = make([]any, 0)
for _, executor := range suite.scheduler.executors { for _, executor := range suite.scheduler.executors {
executor.executingTasks.Range(func(key, value any) bool { executor.executingTasks.Range(func(taskID int64) bool {
keys = append(keys, key) keys = append(keys, taskID)
count++ count++
return true return true
}) })

View File

@ -142,7 +142,7 @@ type dmlChannels struct {
namePrefix string namePrefix string
capacity int64 capacity int64
// pool maintains channelName => dmlMsgStream mapping, stable // pool maintains channelName => dmlMsgStream mapping, stable
pool sync.Map pool *typeutil.ConcurrentMap[string, *dmlMsgStream]
// mut protects channelsHeap only // mut protects channelsHeap only
mut sync.Mutex mut sync.Mutex
// channelsHeap is the heap to pop next dms for use // channelsHeap is the heap to pop next dms for use
@ -174,6 +174,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
namePrefix: chanNamePrefix, namePrefix: chanNamePrefix,
capacity: chanNum, capacity: chanNum,
channelsHeap: make([]*dmlMsgStream, 0, chanNum), channelsHeap: make([]*dmlMsgStream, 0, chanNum),
pool: typeutil.NewConcurrentMap[string, *dmlMsgStream](),
} }
for i, name := range names { for i, name := range names {
@ -206,7 +207,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
idx: int64(i), idx: int64(i),
pos: i, pos: i,
} }
d.pool.Store(name, dms) d.pool.Insert(name, dms)
d.channelsHeap = append(d.channelsHeap, dms) d.channelsHeap = append(d.channelsHeap, dms)
} }
@ -247,8 +248,7 @@ func (d *dmlChannels) listChannels() []string {
var chanNames []string var chanNames []string
d.pool.Range( d.pool.Range(
func(k, v interface{}) bool { func(channel string, dms *dmlMsgStream) bool {
dms := v.(*dmlMsgStream)
if dms.RefCnt() > 0 { if dms.RefCnt() > 0 {
chanNames = append(chanNames, getChannelName(d.namePrefix, dms.idx)) chanNames = append(chanNames, getChannelName(d.namePrefix, dms.idx))
} }
@ -262,12 +262,12 @@ func (d *dmlChannels) getChannelNum() int {
} }
func (d *dmlChannels) getMsgStreamByName(chanName string) (*dmlMsgStream, error) { func (d *dmlChannels) getMsgStreamByName(chanName string) (*dmlMsgStream, error) {
v, ok := d.pool.Load(chanName) dms, ok := d.pool.Get(chanName)
if !ok { if !ok {
log.Error("invalid channelName", zap.String("chanName", chanName)) log.Error("invalid channelName", zap.String("chanName", chanName))
return nil, errors.Newf("invalid channel name: %s", chanName) return nil, errors.Newf("invalid channel name: %s", chanName)
} }
return v.(*dmlMsgStream), nil return dms, nil
} }
func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) error { func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) error {

View File

@ -47,28 +47,31 @@ var (
) )
type ttHistogram struct { type ttHistogram struct {
sync.Map *typeutil.ConcurrentMap[string, Timestamp]
} }
func newTtHistogram() *ttHistogram { func newTtHistogram() *ttHistogram {
return &ttHistogram{} return &ttHistogram{
ConcurrentMap: typeutil.NewConcurrentMap[string, Timestamp](),
}
} }
func (h *ttHistogram) update(channel string, ts Timestamp) { func (h *ttHistogram) update(channel string, ts Timestamp) {
h.Store(channel, ts) h.Insert(channel, ts)
} }
func (h *ttHistogram) get(channel string) Timestamp { func (h *ttHistogram) get(channel string) Timestamp {
ts, ok := h.Load(channel) ts, ok := h.Get(channel)
if !ok { if !ok {
return typeutil.ZeroTimestamp return typeutil.ZeroTimestamp
} }
return ts.(Timestamp) return ts
} }
func (h *ttHistogram) remove(channels ...string) { func (h *ttHistogram) remove(channels ...string) {
for _, channel := range channels { for _, channel := range channels {
h.Delete(channel) h.GetAndRemove(channel)
} }
} }

View File

@ -17,8 +17,6 @@
package msgdispatcher package msgdispatcher
import ( import (
"sync"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"go.uber.org/zap" "go.uber.org/zap"
@ -26,6 +24,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
type ( type (
@ -45,15 +44,16 @@ var _ Client = (*client)(nil)
type client struct { type client struct {
role string role string
nodeID int64 nodeID int64
managers sync.Map // pchannel -> DispatcherManager managers *typeutil.ConcurrentMap[string, DispatcherManager]
factory msgstream.Factory factory msgstream.Factory
} }
func NewClient(factory msgstream.Factory, role string, nodeID int64) Client { func NewClient(factory msgstream.Factory, role string, nodeID int64) Client {
return &client{ return &client{
role: role, role: role,
nodeID: nodeID, nodeID: nodeID,
factory: factory, factory: factory,
managers: typeutil.NewConcurrentMap[string, DispatcherManager](),
} }
} }
@ -62,20 +62,17 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
pchannel := funcutil.ToPhysicalChannel(vchannel) pchannel := funcutil.ToPhysicalChannel(vchannel)
var manager DispatcherManager var manager DispatcherManager
res, ok := c.managers.Load(pchannel) manager, ok := c.managers.Get(pchannel)
if !ok { if !ok {
manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory) manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory)
c.managers.Store(pchannel, manager) c.managers.Insert(pchannel, manager)
go manager.Run() go manager.Run()
} else {
manager, _ = res.(DispatcherManager)
} }
ch, err := manager.Add(vchannel, pos, subPos) ch, err := manager.Add(vchannel, pos, subPos)
if err != nil { if err != nil {
if manager.Num() == 0 { if manager.Num() == 0 {
manager.Close() manager.Close()
c.managers.Delete(pchannel) c.managers.GetAndRemove(pchannel)
} }
log.Error("register failed", zap.Error(err)) log.Error("register failed", zap.Error(err))
return nil, err return nil, err
@ -86,12 +83,11 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg
func (c *client) Deregister(vchannel string) { func (c *client) Deregister(vchannel string) {
pchannel := funcutil.ToPhysicalChannel(vchannel) pchannel := funcutil.ToPhysicalChannel(vchannel)
if res, ok := c.managers.Load(pchannel); ok { if manager, ok := c.managers.Get(pchannel); ok {
manager, _ := res.(DispatcherManager)
manager.Remove(vchannel) manager.Remove(vchannel)
if manager.Num() == 0 { if manager.Num() == 0 {
manager.Close() manager.Close()
c.managers.Delete(pchannel) c.managers.GetAndRemove(pchannel)
} }
log.Info("deregister done", zap.String("role", c.role), log.Info("deregister done", zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
@ -101,11 +97,9 @@ func (c *client) Deregister(vchannel string) {
func (c *client) Close() { func (c *client) Close() {
log := log.With(zap.String("role", c.role), log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID)) zap.Int64("nodeID", c.nodeID))
c.managers.Range(func(key, value any) bool { c.managers.Range(func(pchannel string, manager DispatcherManager) bool {
pchannel := key.(string)
manager := value.(DispatcherManager)
log.Info("close manager", zap.String("channel", pchannel)) log.Info("close manager", zap.String("channel", pchannel))
c.managers.Delete(pchannel) c.managers.GetAndRemove(pchannel)
manager.Close() manager.Close()
return true return true
}) })

View File

@ -61,10 +61,6 @@ func TestClient_Concurrency(t *testing.T) {
wg.Wait() wg.Wait()
expected := int(total - deregisterCount.Load()) expected := int(total - deregisterCount.Load())
var n int n := client1.(*client).managers.Len()
client1.(*client).managers.Range(func(_, _ any) bool {
n++
return true
})
assert.Equal(t, expected, n) assert.Equal(t, expected, n)
} }

View File

@ -69,7 +69,7 @@ type Dispatcher struct {
curTs atomic.Uint64 curTs atomic.Uint64
lagNotifyChan chan struct{} lagNotifyChan chan struct{}
lagTargets *sync.Map // vchannel -> *target lagTargets *typeutil.ConcurrentMap[string, *target] // vchannel -> *target
// vchannel -> *target, lock free since we guarantee that // vchannel -> *target, lock free since we guarantee that
// it's modified only after dispatcher paused or terminated // it's modified only after dispatcher paused or terminated
@ -85,7 +85,7 @@ func NewDispatcher(factory msgstream.Factory,
subName string, subName string,
subPos SubPos, subPos SubPos,
lagNotifyChan chan struct{}, lagNotifyChan chan struct{},
lagTargets *sync.Map, lagTargets *typeutil.ConcurrentMap[string, *target],
) (*Dispatcher, error) { ) (*Dispatcher, error) {
log := log.With(zap.String("pchannel", pchannel), log := log.With(zap.String("pchannel", pchannel),
zap.String("subName", subName), zap.Bool("isMain", isMain)) zap.String("subName", subName), zap.Bool("isMain", isMain))
@ -227,7 +227,7 @@ func (d *Dispatcher) work() {
t.pos = pack.StartPositions[0] t.pos = pack.StartPositions[0]
// replace the pChannel with vChannel // replace the pChannel with vChannel
t.pos.ChannelName = t.vchannel t.pos.ChannelName = t.vchannel
d.lagTargets.LoadOrStore(t.vchannel, t) d.lagTargets.Insert(t.vchannel, t)
d.nonBlockingNotify() d.nonBlockingNotify()
delete(d.targets, vchannel) delete(d.targets, vchannel)
log.Warn("lag target notified", zap.Error(err)) log.Warn("lag target notified", zap.Error(err))

View File

@ -54,7 +54,7 @@ type dispatcherManager struct {
pchannel string pchannel string
lagNotifyChan chan struct{} lagNotifyChan chan struct{}
lagTargets *sync.Map // vchannel -> *target lagTargets *typeutil.ConcurrentMap[string, *target] // vchannel -> *target
mu sync.RWMutex // guards mainDispatcher and soloDispatchers mu sync.RWMutex // guards mainDispatcher and soloDispatchers
mainDispatcher *Dispatcher mainDispatcher *Dispatcher
@ -73,7 +73,7 @@ func NewDispatcherManager(pchannel string, role string, nodeID int64, factory ms
nodeID: nodeID, nodeID: nodeID,
pchannel: pchannel, pchannel: pchannel,
lagNotifyChan: make(chan struct{}, 1), lagNotifyChan: make(chan struct{}, 1),
lagTargets: &sync.Map{}, lagTargets: typeutil.NewConcurrentMap[string, *target](),
soloDispatchers: make(map[string]*Dispatcher), soloDispatchers: make(map[string]*Dispatcher),
factory: factory, factory: factory,
closeChan: make(chan struct{}), closeChan: make(chan struct{}),
@ -132,7 +132,7 @@ func (c *dispatcherManager) Remove(vchannel string) {
c.deleteMetric(vchannel) c.deleteMetric(vchannel)
log.Info("remove soloDispatcher done") log.Info("remove soloDispatcher done")
} }
c.lagTargets.Delete(vchannel) c.lagTargets.GetAndRemove(vchannel)
} }
func (c *dispatcherManager) Num() int { func (c *dispatcherManager) Num() int {
@ -170,9 +170,9 @@ func (c *dispatcherManager) Run() {
c.tryMerge() c.tryMerge()
case <-c.lagNotifyChan: case <-c.lagNotifyChan:
c.mu.Lock() c.mu.Lock()
c.lagTargets.Range(func(vchannel, t any) bool { c.lagTargets.Range(func(vchannel string, t *target) bool {
c.split(t.(*target)) c.split(t)
c.lagTargets.Delete(vchannel) c.lagTargets.GetAndRemove(vchannel)
return true return true
}) })
c.mu.Unlock() c.mu.Unlock()

View File

@ -7,9 +7,10 @@ import "C"
import ( import (
"math" "math"
"sync"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
const maxByteArrayLen = math.MaxInt32 const maxByteArrayLen = math.MaxInt32
@ -17,20 +18,20 @@ const maxByteArrayLen = math.MaxInt32
var globalConverter = NewBytesConverter() var globalConverter = NewBytesConverter()
type BytesConverter struct { type BytesConverter struct {
pointers sync.Map // leaseId -> unsafe.Pointer pointers *typeutil.ConcurrentMap[int32, unsafe.Pointer] // leaseId -> unsafe.Pointer
nextLease int32 nextLease int32
} }
func NewBytesConverter() *BytesConverter { func NewBytesConverter() *BytesConverter {
return &BytesConverter{ return &BytesConverter{
pointers: sync.Map{}, pointers: typeutil.NewConcurrentMap[int32, unsafe.Pointer](),
nextLease: 0, nextLease: 0,
} }
} }
func (converter *BytesConverter) add(p unsafe.Pointer) int32 { func (converter *BytesConverter) add(p unsafe.Pointer) int32 {
lease := atomic.AddInt32(&converter.nextLease, 1) lease := atomic.AddInt32(&converter.nextLease, 1)
converter.pointers.Store(lease, p) converter.pointers.Insert(lease, p)
return lease return lease
} }
@ -63,26 +64,19 @@ func (converter *BytesConverter) Release(lease int32) {
} }
func (converter *BytesConverter) Extract(lease int32) unsafe.Pointer { func (converter *BytesConverter) Extract(lease int32) unsafe.Pointer {
pI, ok := converter.pointers.LoadAndDelete(lease) p, ok := converter.pointers.GetAndRemove(lease)
if !ok { if !ok {
panic("try to release the resource that doesn't exist") panic("try to release the resource that doesn't exist")
} }
p, ok := pI.(unsafe.Pointer)
if !ok {
panic("incorrect value type")
}
return p return p
} }
// Make sure only the caller own the converter // Make sure only the caller own the converter
// or this would release someone's memory // or this would release someone's memory
func (converter *BytesConverter) ReleaseAll() { func (converter *BytesConverter) ReleaseAll() {
converter.pointers.Range(func(key, value interface{}) bool { converter.pointers.Range(func(lease int32, pointer unsafe.Pointer) bool {
pointer := value.(unsafe.Pointer) converter.pointers.GetAndRemove(lease)
converter.pointers.Delete(key)
C.free(pointer) C.free(pointer)
return true return true

View File

@ -19,20 +19,22 @@ package timerecord
import ( import (
"sync" "sync"
"time" "time"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// groups maintains string to GroupChecker // groups maintains string to GroupChecker
var groups sync.Map var groups = typeutil.NewConcurrentMap[string, *GroupChecker]()
// GroupChecker checks members in same group silent for certain period of time // GroupChecker checks members in same group silent for certain period of time
// print warning msg if there are item(s) that not reported // print warning msg if there are item(s) that not reported
type GroupChecker struct { type GroupChecker struct {
groupName string groupName string
d time.Duration // check duration d time.Duration // check duration
t *time.Ticker // internal ticker t *time.Ticker // internal ticker
ch chan struct{} // closing signal ch chan struct{} // closing signal
lastest sync.Map // map member name => lastest report time lastest *typeutil.ConcurrentMap[string, time.Time] // map member name => lastest report time
initOnce sync.Once initOnce sync.Once
stopOnce sync.Once stopOnce sync.Once
@ -52,8 +54,6 @@ func (gc *GroupChecker) init() {
func (gc *GroupChecker) work() { func (gc *GroupChecker) work() {
gc.t = time.NewTicker(gc.d) gc.t = time.NewTicker(gc.d)
defer gc.t.Stop() defer gc.t.Stop()
var name string
var ts time.Time
for { for {
select { select {
@ -63,9 +63,7 @@ func (gc *GroupChecker) work() {
} }
var list []string var list []string
gc.lastest.Range(func(k, v interface{}) bool { gc.lastest.Range(func(name string, ts time.Time) bool {
name = k.(string)
ts = v.(time.Time)
if time.Since(ts) > gc.d { if time.Since(ts) > gc.d {
list = append(list, name) list = append(list, name)
} }
@ -79,19 +77,19 @@ func (gc *GroupChecker) work() {
// Check updates the latest timestamp for provided name // Check updates the latest timestamp for provided name
func (gc *GroupChecker) Check(name string) { func (gc *GroupChecker) Check(name string) {
gc.lastest.Store(name, time.Now()) gc.lastest.Insert(name, time.Now())
} }
// Remove deletes name from watch list // Remove deletes name from watch list
func (gc *GroupChecker) Remove(name string) { func (gc *GroupChecker) Remove(name string) {
gc.lastest.Delete(name) gc.lastest.GetAndRemove(name)
} }
// Stop closes the GroupChecker // Stop closes the GroupChecker
func (gc *GroupChecker) Stop() { func (gc *GroupChecker) Stop() {
gc.stopOnce.Do(func() { gc.stopOnce.Do(func() {
close(gc.ch) close(gc.ch)
groups.Delete(gc.groupName) groups.GetAndRemove(gc.groupName)
}) })
} }
@ -103,12 +101,12 @@ func GetGroupChecker(groupName string, duration time.Duration, fn func([]string)
groupName: groupName, groupName: groupName,
d: duration, d: duration,
fn: fn, fn: fn,
lastest: typeutil.NewConcurrentMap[string, time.Time](),
} }
actual, loaded := groups.LoadOrStore(groupName, gc) gc, loaded := groups.GetOrInsert(groupName, gc)
if !loaded { if !loaded {
gc.init() gc.init()
} }
gc = actual.(*GroupChecker)
return gc return gc
} }