mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
Fix txn limit error when querycoord remove task from etcd (#12084)
Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
parent
8f40450d93
commit
d666c2a190
@ -222,6 +222,7 @@ func (c *queryNodeCluster) reloadFromKV() error {
|
||||
log.Debug("ReloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
|
||||
return err
|
||||
}
|
||||
log.Debug("ReloadFromKV: reload collection info from etcd", zap.Any("info", collectionInfo))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -171,6 +171,7 @@ func (qn *queryNode) addCollection(collectionID UniqueID, schema *schemapb.Colle
|
||||
log.Error("AddCollection: save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
|
||||
return err
|
||||
}
|
||||
log.Debug("queryNode addCollection", zap.Int64("nodeID", qn.id), zap.Any("collectionInfo", newCollection))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -215,6 +216,7 @@ func (qn *queryNode) addPartition(collectionID UniqueID, partitionID UniqueID) e
|
||||
if err != nil {
|
||||
log.Error("AddPartition: save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
|
||||
}
|
||||
log.Debug("queryNode add partition", zap.Int64("nodeID", qn.id), zap.Any("collectionInfo", col))
|
||||
return nil
|
||||
}
|
||||
return errors.New("AddPartition: can't find collection when add partition")
|
||||
@ -255,11 +257,12 @@ func (qn *queryNode) releasePartitionsInfo(collectionID UniqueID, partitionIDs [
|
||||
}
|
||||
}
|
||||
info.PartitionIDs = newPartitionIDs
|
||||
err := removeNodeCollectionInfo(collectionID, qn.id, qn.kvClient)
|
||||
err := saveNodeCollectionInfo(collectionID, info, qn.id, qn.kvClient)
|
||||
if err != nil {
|
||||
log.Error("ReleasePartitionsInfo: remove collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
|
||||
return err
|
||||
}
|
||||
log.Debug("queryNode release partition info", zap.Int64("nodeID", qn.id), zap.Any("info", info))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -98,7 +98,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
|
||||
if len(reqs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs")
|
||||
dataSizePerReq := make([]int64, 0)
|
||||
for _, req := range reqs {
|
||||
sizeOfReq, err := cluster.estimateSegmentsSize(req)
|
||||
@ -107,7 +107,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
|
||||
}
|
||||
dataSizePerReq = append(dataSizePerReq, sizeOfReq)
|
||||
}
|
||||
|
||||
log.Debug("shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end")
|
||||
for {
|
||||
// online nodes map and totalMem, usedMem, memUsage of every node
|
||||
totalMem := make(map[int64]uint64)
|
||||
@ -143,6 +143,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
|
||||
// update totalMem, memUsage, memUsageRate
|
||||
totalMem[nodeID], memUsage[nodeID], memUsageRate[nodeID] = queryNodeInfo.totalMem, queryNodeInfo.memUsage, queryNodeInfo.memUsageRate
|
||||
}
|
||||
log.Debug("shuffleSegmentsToQueryNodeV2: num of availableNodes", zap.Int("size", len(availableNodes)))
|
||||
if len(availableNodes) > 0 {
|
||||
nodeIDSlice := make([]int64, 0, len(availableNodes))
|
||||
for nodeID := range availableNodes {
|
||||
@ -176,6 +177,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
|
||||
}
|
||||
|
||||
if allocateSegmentsDone {
|
||||
log.Debug("shuffleSegmentsToQueryNodeV2: shuffle segment to query node success")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -1718,6 +1718,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug("loadBalanceTask: partitions to recover", zap.Int64s("partitionIDs", partitionIDs))
|
||||
for _, partitionID := range partitionIDs {
|
||||
getRecoveryInfo := &datapb.GetRecoveryInfoRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
@ -2079,6 +2080,7 @@ func assignInternalTask(ctx context.Context,
|
||||
wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) ([]task, error) {
|
||||
sp, _ := trace.StartSpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
log.Debug("assignInternalTask: start assign task to query node")
|
||||
internalTasks := make([]task, 0)
|
||||
err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs)
|
||||
if err != nil {
|
||||
|
@ -631,21 +631,27 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
||||
}
|
||||
|
||||
removeTaskFromKVFn := func(triggerTask task) error {
|
||||
keys := make([]string, 0)
|
||||
taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, triggerTask.getTaskID())
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, triggerTask.getTaskID())
|
||||
keys = append(keys, taskKey)
|
||||
keys = append(keys, stateKey)
|
||||
childTasks := triggerTask.getChildTask()
|
||||
for _, t := range childTasks {
|
||||
taskKey = fmt.Sprintf("%s/%d", activeTaskPrefix, t.getTaskID())
|
||||
stateKey = fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID())
|
||||
keys = append(keys, taskKey)
|
||||
keys = append(keys, stateKey)
|
||||
childTaskKeys := make([]string, 0)
|
||||
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.getTaskID())
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID())
|
||||
childTaskKeys = append(childTaskKeys, taskKey)
|
||||
childTaskKeys = append(childTaskKeys, stateKey)
|
||||
err := scheduler.client.MultiRemove(childTaskKeys)
|
||||
// after recover, child Task's state will be TaskDone, will not be repeat executed
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
err := scheduler.client.MultiRemove(keys)
|
||||
triggerTaskKeys := make([]string, 0)
|
||||
taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, triggerTask.getTaskID())
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, triggerTask.getTaskID())
|
||||
triggerTaskKeys = append(triggerTaskKeys, taskKey)
|
||||
triggerTaskKeys = append(triggerTaskKeys, stateKey)
|
||||
err := scheduler.client.MultiRemove(triggerTaskKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
panic(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user