mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 12:29:36 +08:00
Refactor code
1. add error as return value 2. check assertion success Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
fb000d0060
commit
6dc938e6f4
@ -76,7 +76,11 @@ func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 {
|
||||
|
||||
func (info *assignInfo) RemoveExpired(ts Timestamp) {
|
||||
for e := info.segInfos.Front(); e != nil; e = e.Next() {
|
||||
segInfo := e.Value.(*segInfo)
|
||||
segInfo, ok := e.Value.(*segInfo)
|
||||
if !ok {
|
||||
log.Printf("can not cast to segInfo")
|
||||
continue
|
||||
}
|
||||
if segInfo.IsExpired(ts) {
|
||||
info.segInfos.Remove(e)
|
||||
}
|
||||
@ -190,8 +194,8 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
|
||||
}
|
||||
|
||||
records[collID][partitionID][channelName] += segRequest.count
|
||||
assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if assign == nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] {
|
||||
assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if err != nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] {
|
||||
sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{
|
||||
ChannelName: channelName,
|
||||
Count: segRequest.count,
|
||||
@ -206,10 +210,10 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
|
||||
sa.ToDoReqs = newTodoReqs
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) *assignInfo {
|
||||
func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) (*assignInfo, error) {
|
||||
assignInfos, ok := sa.assignInfos[collID]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, fmt.Errorf("can not find collection %d", collID)
|
||||
}
|
||||
|
||||
for e := assignInfos.Front(); e != nil; e = e.Next() {
|
||||
@ -217,9 +221,10 @@ func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channe
|
||||
if info.partitionID != partitionID || info.channelName != channelName {
|
||||
continue
|
||||
}
|
||||
return info
|
||||
return info, nil
|
||||
}
|
||||
return nil
|
||||
return nil, fmt.Errorf("can not find assign info with collID %d, partitionID %d, channelName %s",
|
||||
collID, partitionID, channelName)
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) checkSyncFunc(timeout bool) bool {
|
||||
@ -296,13 +301,13 @@ func (sa *SegIDAssigner) syncSegments() bool {
|
||||
log.Println("SyncSegment Error:", info.Status.Reason)
|
||||
continue
|
||||
}
|
||||
assign := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)
|
||||
assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)
|
||||
segInfo := &segInfo{
|
||||
segID: info.SegID,
|
||||
count: info.Count,
|
||||
expireTime: info.ExpireTime,
|
||||
}
|
||||
if assign == nil {
|
||||
if err != nil {
|
||||
colInfos, ok := sa.assignInfos[info.CollectionID]
|
||||
if !ok {
|
||||
colInfos = list.New()
|
||||
@ -329,9 +334,9 @@ func (sa *SegIDAssigner) syncSegments() bool {
|
||||
|
||||
func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
|
||||
segRequest := req.(*segRequest)
|
||||
assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if assign == nil {
|
||||
return errors.New("Failed to GetSegmentID")
|
||||
assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result, err := assign.Assign(segRequest.timestamp, segRequest.count)
|
||||
segRequest.segInfo = result
|
||||
|
Loading…
Reference in New Issue
Block a user