milvus/internal/proxy/connection/manager.go

182 lines
3.7 KiB
Go

package connection
import (
"context"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/log"
)
const (
// we shouldn't check this too frequently.
defaultConnCheckDuration = 2 * time.Minute
defaultTTLForInactiveConn = 24 * time.Hour
)
type connectionManager struct {
mu sync.RWMutex
initOnce sync.Once
stopOnce sync.Once
closeSignal chan struct{}
wg sync.WaitGroup
buffer chan int64
duration time.Duration
ttl time.Duration
clientInfos map[int64]clientInfo
}
type connectionManagerOption func(s *connectionManager)
func withDuration(duration time.Duration) connectionManagerOption {
return func(s *connectionManager) {
s.duration = duration
}
}
func withTTL(ttl time.Duration) connectionManagerOption {
return func(s *connectionManager) {
s.ttl = ttl
}
}
func (s *connectionManager) apply(opts ...connectionManagerOption) {
for _, opt := range opts {
opt(s)
}
}
func (s *connectionManager) init() {
s.initOnce.Do(func() {
s.wg.Add(1)
go s.checkLoop()
})
}
func (s *connectionManager) Stop() {
s.stopOnce.Do(func() {
close(s.closeSignal)
s.wg.Wait()
})
}
func (s *connectionManager) checkLoop() {
defer s.wg.Done()
t := time.NewTicker(s.duration)
defer t.Stop()
for {
select {
case <-s.closeSignal:
log.Info("connection manager closed")
return
case identifier := <-s.buffer:
s.Update(identifier)
case <-t.C:
s.removeLongInactiveClients()
}
}
}
func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
cli := clientInfo{
ClientInfo: info,
identifier: identifier,
lastActiveTime: time.Now(),
}
s.mu.Lock()
defer s.mu.Unlock()
s.clientInfos[identifier] = cli
log.Ctx(ctx).Info("client register", cli.GetLogger()...)
}
func (s *connectionManager) KeepActive(identifier int64) {
// make this asynchronous and then the rpc won't be blocked too long.
s.buffer <- identifier
}
func (s *connectionManager) List() []*commonpb.ClientInfo {
s.mu.RLock()
defer s.mu.RUnlock()
clients := make([]*commonpb.ClientInfo, 0, len(s.clientInfos))
for identifier, cli := range s.clientInfos {
if cli.ClientInfo != nil {
client := proto.Clone(cli.ClientInfo).(*commonpb.ClientInfo)
if client.Reserved == nil {
client.Reserved = make(map[string]string)
}
client.Reserved["identifier"] = string(strconv.AppendInt(nil, identifier, 10))
client.Reserved["last_active_time"] = cli.lastActiveTime.String()
clients = append(clients, client)
}
}
return clients
}
func (s *connectionManager) Get(ctx context.Context) *commonpb.ClientInfo {
s.mu.RLock()
defer s.mu.RUnlock()
identifier, err := GetIdentifierFromContext(ctx)
if err != nil {
return nil
}
cli, ok := s.clientInfos[identifier]
if !ok {
return nil
}
return cli.ClientInfo
}
func (s *connectionManager) Update(identifier int64) {
s.mu.Lock()
defer s.mu.Unlock()
cli, ok := s.clientInfos[identifier]
if ok {
cli.lastActiveTime = time.Now()
s.clientInfos[identifier] = cli
}
}
func (s *connectionManager) removeLongInactiveClients() {
s.mu.Lock()
defer s.mu.Unlock()
for candidate, cli := range s.clientInfos {
if time.Since(cli.lastActiveTime) > s.ttl {
log.Info("client deregister", cli.GetLogger()...)
delete(s.clientInfos, candidate)
}
}
}
func newConnectionManager(opts ...connectionManagerOption) *connectionManager {
s := &connectionManager{
closeSignal: make(chan struct{}, 1),
buffer: make(chan int64, 64),
duration: defaultConnCheckDuration,
ttl: defaultTTLForInactiveConn,
clientInfos: make(map[int64]clientInfo),
}
s.apply(opts...)
s.init()
return s
}