milvus/internal/proxynode/segment.go
zhenshan.cao ee0faddb07 Refactor proxy
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
2021-01-29 09:27:26 +08:00

374 lines
9.1 KiB
Go

package proxynode
import (
"container/list"
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
SegCountPerRPC = 20000
ActiveTimeDuration = 100 //second
)
type Allocator = allocator.Allocator
type segRequest struct {
allocator.BaseRequest
count uint32
colName string
partitionName string
collID UniqueID
partitionID UniqueID
segInfo map[UniqueID]uint32
channelID int32
timestamp Timestamp
}
type segInfo struct {
segID UniqueID
count uint32
expireTime Timestamp
}
type assignInfo struct {
collID UniqueID
partitionID UniqueID
collName string
partitionName string
channelID int32
segID UniqueID
segInfos *list.List
segCapacity uint32
lastInsertTime time.Time
}
func (info *segInfo) IsExpired(ts Timestamp) bool {
return ts > info.expireTime || info.count <= 0
}
func (info *segInfo) Capacity(ts Timestamp) uint32 {
if info.IsExpired(ts) {
return 0
}
return info.count
}
func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 {
if info.IsExpired(ts) {
return 0
}
ret := uint32(0)
if info.count >= count {
info.count -= count
ret = count
} else {
info.count = 0
ret = info.count
}
return ret
}
func (info *assignInfo) RemoveExpired(ts Timestamp) {
for e := info.segInfos.Front(); e != nil; e = e.Next() {
segInfo := e.Value.(*segInfo)
if segInfo.IsExpired(ts) {
info.segInfos.Remove(e)
}
}
}
func (info *assignInfo) Capacity(ts Timestamp) uint32 {
ret := uint32(0)
for e := info.segInfos.Front(); e != nil; e = e.Next() {
segInfo := e.Value.(*segInfo)
ret += segInfo.Capacity(ts)
}
return ret
}
func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32, error) {
capacity := info.Capacity(ts)
if capacity < count {
errMsg := fmt.Sprintf("AssignSegment Failed: capacity:%d is less than count:%d", capacity, count)
return nil, errors.New(errMsg)
}
result := make(map[UniqueID]uint32)
for e := info.segInfos.Front(); e != nil && count != 0; e = e.Next() {
segInfo := e.Value.(*segInfo)
cur := segInfo.Assign(ts, count)
count -= cur
if cur > 0 {
result[segInfo.segID] += cur
}
}
return result, nil
}
func (info *assignInfo) IsActive(now time.Time) bool {
return now.Sub(info.lastInsertTime) <= ActiveTimeDuration*time.Second
}
type SegIDAssigner struct {
Allocator
assignInfos map[string]*list.List // collectionName -> *list.List
segReqs []*datapb.SegIDRequest
getTickFunc func() Timestamp
PeerID UniqueID
serviceClient DataServiceClient
countPerRPC uint32
}
func NewSegIDAssigner(ctx context.Context, client DataServiceClient, getTickFunc func() Timestamp) (*SegIDAssigner, error) {
ctx1, cancel := context.WithCancel(ctx)
sa := &SegIDAssigner{
Allocator: Allocator{
Ctx: ctx1,
CancelFunc: cancel,
},
countPerRPC: SegCountPerRPC,
serviceClient: client,
assignInfos: make(map[string]*list.List),
getTickFunc: getTickFunc,
}
sa.TChan = &allocator.Ticker{
UpdateInterval: time.Second,
}
sa.Allocator.SyncFunc = sa.syncSegments
sa.Allocator.ProcessFunc = sa.processFunc
sa.Allocator.CheckSyncFunc = sa.checkSyncFunc
sa.Allocator.PickCanDoFunc = sa.pickCanDoFunc
sa.Init()
return sa, nil
}
func (sa *SegIDAssigner) SetServiceClient(client DataServiceClient) {
sa.serviceClient = client
}
func (sa *SegIDAssigner) collectExpired() {
ts := sa.getTickFunc()
//now := time.Now()
for _, info := range sa.assignInfos {
for e := info.Front(); e != nil; e = e.Next() {
assign := e.Value.(*assignInfo)
assign.RemoveExpired(ts)
if assign.Capacity(ts) == 0 {
info.Remove(e)
}
}
}
}
func (sa *SegIDAssigner) pickCanDoFunc() {
if sa.ToDoReqs == nil {
return
}
records := make(map[string]map[string]map[int32]uint32)
newTodoReqs := sa.ToDoReqs[0:0]
for _, req := range sa.ToDoReqs {
segRequest := req.(*segRequest)
colName := segRequest.colName
partitionName := segRequest.partitionName
channelID := segRequest.channelID
if _, ok := records[colName]; !ok {
records[colName] = make(map[string]map[int32]uint32)
}
if _, ok := records[colName][partitionName]; !ok {
records[colName][partitionName] = make(map[int32]uint32)
}
if _, ok := records[colName][partitionName][channelID]; !ok {
records[colName][partitionName][channelID] = 0
}
records[colName][partitionName][channelID] += segRequest.count
assign := sa.getAssign(segRequest.colName, segRequest.partitionName, segRequest.channelID)
if assign == nil || assign.Capacity(segRequest.timestamp) < records[colName][partitionName][channelID] {
partitionID, _ := typeutil.Hash32String(segRequest.colName)
sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{
ChannelName: strconv.FormatUint(uint64(segRequest.channelID), 10),
Count: segRequest.count,
CollName: segRequest.colName,
PartitionName: segRequest.partitionName,
CollectionID: 0,
PartitionID: partitionID,
})
newTodoReqs = append(newTodoReqs, req)
} else {
sa.CanDoReqs = append(sa.CanDoReqs, req)
}
}
sa.ToDoReqs = newTodoReqs
}
func (sa *SegIDAssigner) getAssign(colName, partitionName string, channelID int32) *assignInfo {
assignInfos, ok := sa.assignInfos[colName]
if !ok {
return nil
}
for e := assignInfos.Front(); e != nil; e = e.Next() {
info := e.Value.(*assignInfo)
if info.partitionName != partitionName || info.channelID != channelID {
continue
}
return info
}
return nil
}
func (sa *SegIDAssigner) checkSyncFunc(timeout bool) bool {
sa.collectExpired()
return timeout || len(sa.segReqs) != 0
}
func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegIDRequest) bool {
if req1 == nil || req2 == nil {
return false
}
if req1 == req2 {
return true
}
return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelName == req2.ChannelName
}
func (sa *SegIDAssigner) reduceSegReqs() {
if len(sa.segReqs) == 0 {
return
}
var newSegReqs []*datapb.SegIDRequest
for _, req1 := range sa.segReqs {
var req2 *datapb.SegIDRequest
for _, req3 := range newSegReqs {
if sa.checkSegReqEqual(req1, req3) {
req2 = req3
break
}
}
if req2 == nil { // not found
newSegReqs = append(newSegReqs, req1)
} else {
req2.Count += req1.Count
}
}
for _, req := range newSegReqs {
if req.Count == 0 {
req.Count = sa.countPerRPC
}
}
sa.segReqs = newSegReqs
}
func (sa *SegIDAssigner) syncSegments() bool {
if len(sa.segReqs) == 0 {
return true
}
sa.reduceSegReqs()
_, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &datapb.AssignSegIDRequest{
NodeID: sa.PeerID,
PeerRole: typeutil.ProxyNodeRole,
SegIDRequests: sa.segReqs,
}
sa.segReqs = []*datapb.SegIDRequest{}
resp, err := sa.serviceClient.AssignSegmentID(req)
if err != nil {
log.Println("GRPC AssignSegmentID Failed", resp, err)
return false
}
now := time.Now()
success := false
for _, info := range resp.SegIDAssignments {
if info.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS {
log.Println("SyncSegment Error:", info.Status.Reason)
continue
}
// FIXME: use channelName
channel, err := strconv.Atoi(info.ChannelName)
if err != nil {
return false
}
assign := sa.getAssign(info.CollName, info.PartitionName, int32(channel))
segInfo := &segInfo{
segID: info.SegID,
count: info.Count,
expireTime: info.ExpireTime,
}
if assign == nil {
colInfos, ok := sa.assignInfos[info.CollName]
if !ok {
colInfos = list.New()
}
segInfos := list.New()
segInfos.PushBack(segInfo)
assign = &assignInfo{
collID: info.CollectionID,
partitionID: info.PartitionID,
channelID: int32(channel),
segInfos: segInfos,
partitionName: info.PartitionName,
collName: info.CollName,
}
colInfos.PushBack(assign)
sa.assignInfos[info.CollName] = colInfos
} else {
assign.segInfos.PushBack(segInfo)
}
assign.lastInsertTime = now
success = true
}
return success
}
func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
segRequest := req.(*segRequest)
assign := sa.getAssign(segRequest.colName, segRequest.partitionName, segRequest.channelID)
if assign == nil {
return errors.New("Failed to GetSegmentID")
}
result, err := assign.Assign(segRequest.timestamp, segRequest.count)
segRequest.segInfo = result
return err
}
func (sa *SegIDAssigner) GetSegmentID(colName, partitionName string, channelID int32, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
req := &segRequest{
BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false},
colName: colName,
partitionName: partitionName,
channelID: channelID,
count: count,
timestamp: ts,
}
sa.Reqs <- req
req.Wait()
if !req.IsValid() {
return nil, errors.New("GetSegmentID Failed")
}
return req.segInfo, nil
}