mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
29ddb10e8c
https://github.com/milvus-io/milvus/issues/36132 Signed-off-by: fengjun2016 <jornfeng@gmail.com>
174 lines
4.2 KiB
Go
174 lines
4.2 KiB
Go
package connection
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
type connectionManager struct {
|
|
initOnce sync.Once
|
|
stopOnce sync.Once
|
|
|
|
closeSignal chan struct{}
|
|
wg sync.WaitGroup
|
|
|
|
clientInfos *typeutil.ConcurrentMap[int64, clientInfo]
|
|
}
|
|
|
|
func (s *connectionManager) init() {
|
|
s.initOnce.Do(func() {
|
|
s.wg.Add(1)
|
|
go s.checkLoop()
|
|
})
|
|
}
|
|
|
|
func (s *connectionManager) Stop() {
|
|
s.stopOnce.Do(func() {
|
|
close(s.closeSignal)
|
|
s.wg.Wait()
|
|
})
|
|
}
|
|
|
|
func (s *connectionManager) checkLoop() {
|
|
defer s.wg.Done()
|
|
|
|
t := time.NewTimer(paramtable.Get().ProxyCfg.ConnectionCheckIntervalSeconds.GetAsDuration(time.Second))
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.closeSignal:
|
|
log.Info("connection manager closed")
|
|
return
|
|
case <-t.C:
|
|
s.removeLongInactiveClients()
|
|
// not sure if we should purge them periodically.
|
|
s.purgeIfNumOfClientsExceed()
|
|
t.Reset(paramtable.Get().ProxyCfg.ConnectionCheckIntervalSeconds.GetAsDuration(time.Second))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *connectionManager) purgeIfNumOfClientsExceed() {
|
|
diffNum := int64(s.clientInfos.Len()) - paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64()
|
|
if diffNum <= 0 {
|
|
return
|
|
}
|
|
|
|
begin := time.Now()
|
|
|
|
log := log.With(
|
|
zap.Int64("num", int64(s.clientInfos.Len())),
|
|
zap.Int64("limit", paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64()))
|
|
|
|
log.Info("number of client infos exceed limit, ready to purge the oldest")
|
|
q := newPriorityQueueWithCap(int(diffNum + 1))
|
|
s.clientInfos.Range(func(identifier int64, info clientInfo) bool {
|
|
heap.Push(&q, newQueryItem(info.identifier, info.lastActiveTime))
|
|
if int64(q.Len()) > diffNum {
|
|
// pop the newest.
|
|
heap.Pop(&q)
|
|
}
|
|
return true
|
|
})
|
|
|
|
// time order doesn't matter here.
|
|
for _, item := range q {
|
|
info, exist := s.clientInfos.GetAndRemove(item.identifier)
|
|
if exist {
|
|
log.Info("remove client info", info.GetLogger()...)
|
|
}
|
|
}
|
|
|
|
log.Info("purge client infos done",
|
|
zap.Duration("cost", time.Since(begin)),
|
|
zap.Int64("num after purge", int64(s.clientInfos.Len())))
|
|
}
|
|
|
|
func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
|
|
cli := clientInfo{
|
|
ClientInfo: info,
|
|
identifier: identifier,
|
|
lastActiveTime: time.Now(),
|
|
}
|
|
|
|
s.clientInfos.Insert(identifier, cli)
|
|
log.Ctx(ctx).Info("client register", cli.GetLogger()...)
|
|
}
|
|
|
|
func (s *connectionManager) KeepActive(identifier int64) {
|
|
s.Update(identifier)
|
|
}
|
|
|
|
func (s *connectionManager) List() []*commonpb.ClientInfo {
|
|
clients := make([]*commonpb.ClientInfo, 0, s.clientInfos.Len())
|
|
|
|
s.clientInfos.Range(func(identifier int64, info clientInfo) bool {
|
|
if info.ClientInfo != nil {
|
|
client := typeutil.Clone(info.ClientInfo)
|
|
if client.Reserved == nil {
|
|
client.Reserved = make(map[string]string)
|
|
}
|
|
client.Reserved["identifier"] = string(strconv.AppendInt(nil, identifier, 10))
|
|
client.Reserved["last_active_time"] = info.lastActiveTime.String()
|
|
|
|
clients = append(clients, client)
|
|
}
|
|
return true
|
|
})
|
|
|
|
return clients
|
|
}
|
|
|
|
func (s *connectionManager) Get(ctx context.Context) *commonpb.ClientInfo {
|
|
identifier, err := GetIdentifierFromContext(ctx)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
cli, ok := s.clientInfos.Get(identifier)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return cli.ClientInfo
|
|
}
|
|
|
|
func (s *connectionManager) Update(identifier int64) {
|
|
info, ok := s.clientInfos.Get(identifier)
|
|
if ok {
|
|
info.lastActiveTime = time.Now()
|
|
s.clientInfos.Insert(identifier, info)
|
|
}
|
|
}
|
|
|
|
func (s *connectionManager) removeLongInactiveClients() {
|
|
ttl := paramtable.Get().ProxyCfg.ConnectionClientInfoTTLSeconds.GetAsDuration(time.Second)
|
|
s.clientInfos.Range(func(candidate int64, info clientInfo) bool {
|
|
if time.Since(info.lastActiveTime) > ttl {
|
|
log.Info("client deregister", info.GetLogger()...)
|
|
s.clientInfos.Remove(candidate)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func newConnectionManager() *connectionManager {
|
|
s := &connectionManager{
|
|
closeSignal: make(chan struct{}, 1),
|
|
clientInfos: typeutil.NewConcurrentMap[int64, clientInfo](),
|
|
}
|
|
s.init()
|
|
|
|
return s
|
|
}
|