mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
Improve proxy leader cache lock logic (#19917)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
7fe8e50689
commit
b15e97a61a
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
@ -80,23 +81,46 @@ type collectionInfo struct {
|
||||
collID typeutil.UniqueID
|
||||
schema *schemapb.CollectionSchema
|
||||
partInfo map[string]*partitionInfo
|
||||
shardLeaders map[string][]nodeInfo
|
||||
leaderMutex sync.Mutex
|
||||
leaderMutex sync.RWMutex
|
||||
shardLeaders *shardLeaders
|
||||
createdTimestamp uint64
|
||||
createdUtcTimestamp uint64
|
||||
isLoaded bool
|
||||
}
|
||||
|
||||
// CloneShardLeaders returns a copy of shard leaders
|
||||
// leaderMutex shall be accuired before invoking this method
|
||||
func (c *collectionInfo) CloneShardLeaders() map[string][]nodeInfo {
|
||||
m := make(map[string][]nodeInfo)
|
||||
for channel, leaders := range c.shardLeaders {
|
||||
l := make([]nodeInfo, len(leaders))
|
||||
copy(l, leaders)
|
||||
m[channel] = l
|
||||
// shardLeaders wraps shard leader mapping for iteration.
|
||||
type shardLeaders struct {
|
||||
idx *atomic.Int64
|
||||
|
||||
shardLeaders map[string][]nodeInfo
|
||||
}
|
||||
|
||||
type shardLeadersReader struct {
|
||||
leaders *shardLeaders
|
||||
idx int64
|
||||
}
|
||||
|
||||
// Shuffle returns the shuffled shard leader list.
|
||||
func (it shardLeadersReader) Shuffle() map[string][]nodeInfo {
|
||||
result := make(map[string][]nodeInfo)
|
||||
for channel, leaders := range it.leaders.shardLeaders {
|
||||
l := len(leaders)
|
||||
shuffled := make([]nodeInfo, 0, len(leaders))
|
||||
for i := 0; i < l; i++ {
|
||||
shuffled = append(shuffled, leaders[(i+int(it.idx))%l])
|
||||
}
|
||||
result[channel] = shuffled
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// GetReader returns shuffer reader for shard leader.
|
||||
func (sl *shardLeaders) GetReader() shardLeadersReader {
|
||||
idx := sl.idx.Inc()
|
||||
return shardLeadersReader{
|
||||
leaders: sl,
|
||||
idx: idx,
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
type partitionInfo struct {
|
||||
@ -572,14 +596,16 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam
|
||||
}
|
||||
|
||||
if withCache {
|
||||
if len(info.shardLeaders) > 0 {
|
||||
info.leaderMutex.Lock()
|
||||
updateShardsWithRoundRobin(info.shardLeaders)
|
||||
var shardLeaders *shardLeaders
|
||||
info.leaderMutex.RLock()
|
||||
shardLeaders = info.shardLeaders
|
||||
info.leaderMutex.RUnlock()
|
||||
|
||||
shards := info.CloneShardLeaders()
|
||||
info.leaderMutex.Unlock()
|
||||
return shards, nil
|
||||
if shardLeaders != nil {
|
||||
iterator := info.shardLeaders.GetReader()
|
||||
return iterator.Shuffle(), nil
|
||||
}
|
||||
|
||||
log.Info("no shard cache for collection, try to get shard leaders from QueryCoord",
|
||||
zap.String("collectionName", collectionName))
|
||||
}
|
||||
@ -625,12 +651,22 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam
|
||||
// lock leader
|
||||
info.leaderMutex.Lock()
|
||||
oldShards := info.shardLeaders
|
||||
info.shardLeaders = shards
|
||||
info.shardLeaders = &shardLeaders{
|
||||
shardLeaders: shards,
|
||||
idx: atomic.NewInt64(0),
|
||||
}
|
||||
iterator := info.shardLeaders.GetReader()
|
||||
info.leaderMutex.Unlock()
|
||||
|
||||
ret := iterator.Shuffle()
|
||||
oldLeaders := make(map[string][]nodeInfo)
|
||||
if oldShards != nil {
|
||||
oldLeaders = oldShards.shardLeaders
|
||||
}
|
||||
// update refcnt in shardClientMgr
|
||||
ret := info.CloneShardLeaders()
|
||||
_ = m.shardMgr.UpdateShardLeaders(oldShards, ret)
|
||||
// and create new client for new leaders
|
||||
_ = m.shardMgr.UpdateShardLeaders(oldLeaders, ret)
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@ -660,8 +696,8 @@ func (m *MetaCache) ClearShards(collectionName string) {
|
||||
}
|
||||
m.mu.Unlock()
|
||||
// delete refcnt in shardClientMgr
|
||||
if ok {
|
||||
_ = m.shardMgr.UpdateShardLeaders(info.shardLeaders, nil)
|
||||
if ok && info.shardLeaders != nil {
|
||||
_ = m.shardMgr.UpdateShardLeaders(info.shardLeaders.shardLeaders, nil)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user