package proxy import ( "context" "errors" "fmt" "strconv" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" "go.uber.org/zap" ) const ( GetCollectionStatisticsTaskName = "GetCollectionStatisticsTask" GetPartitionStatisticsTaskName = "GetPartitionStatisticsTask" ) type getStatisticsTask struct { request *milvuspb.GetStatisticsRequest result *milvuspb.GetStatisticsResponse Condition collectionName string partitionNames []string // partition ids that are loaded into query node, require get statistics from QueryNode loadedPartitionIDs []UniqueID // partition ids that are not loaded into query node, require get statistics from DataCoord unloadedPartitionIDs []UniqueID ctx context.Context dc types.DataCoord tr *timerecord.TimeRecorder toReduceResults []*internalpb.GetStatisticsResponse fromDataCoord bool fromQueryNode bool // if query from shard *internalpb.GetStatisticsRequest qc types.QueryCoord resultBuf chan *internalpb.GetStatisticsResponse statisticShardPolicy pickShardPolicy shardMgr *shardClientMgr } func (g *getStatisticsTask) TraceCtx() context.Context { return g.ctx } func (g *getStatisticsTask) ID() UniqueID { return g.Base.MsgID } func (g *getStatisticsTask) SetID(uid UniqueID) { g.Base.MsgID = uid } func (g *getStatisticsTask) Name() string { return GetPartitionStatisticsTaskName } func (g *getStatisticsTask) Type() commonpb.MsgType { return g.Base.MsgType } func (g *getStatisticsTask) BeginTs() Timestamp { return g.Base.Timestamp } func (g *getStatisticsTask) EndTs() Timestamp { return g.Base.Timestamp } func (g *getStatisticsTask) SetTs(ts Timestamp) { g.Base.Timestamp = ts } func (g *getStatisticsTask) OnEnqueue() error { g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{ Base: commonpbutil.NewMsgBase(), } return nil } func (g *getStatisticsTask) PreExecute(ctx context.Context) error { g.DbID = 0 g.collectionName = g.request.GetCollectionName() g.partitionNames = g.request.GetPartitionNames() // g.TravelTimestamp = g.request.GetTravelTimestamp() g.GuaranteeTimestamp = g.request.GetGuaranteeTimestamp() sp, ctx := trace.StartSpanFromContextWithOperationName(g.TraceCtx(), "Proxy-GetStatistics-PreExecute") defer sp.Finish() if g.statisticShardPolicy == nil { g.statisticShardPolicy = mergeRoundRobinPolicy } // TODO: Maybe we should create a new MsgType: GetStatistics? g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics g.Base.SourceID = paramtable.GetNodeID() collID, err := globalMetaCache.GetCollectionID(ctx, g.collectionName) if err != nil { // err is not nil if collection not exists return err } partIDs, err := getPartitionIDs(ctx, g.collectionName, g.partitionNames) if err != nil { // err is not nil if partition not exists return err } g.GetStatisticsRequest.DbID = 0 // todo g.GetStatisticsRequest.CollectionID = collID if g.TravelTimestamp == 0 { g.TravelTimestamp = g.BeginTs() } err = validateTravelTimestamp(g.TravelTimestamp, g.BeginTs()) if err != nil { return err } g.GuaranteeTimestamp = parseGuaranteeTs(g.GuaranteeTimestamp, g.BeginTs()) deadline, ok := g.TraceCtx().Deadline() if ok { g.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0) } // check if collection/partitions are loaded into query node loaded, unloaded, err := checkFullLoaded(ctx, g.qc, g.collectionName, partIDs) if err != nil { g.fromDataCoord = true g.unloadedPartitionIDs = partIDs log.Debug("checkFullLoaded failed, try get statistics from DataCoord", zap.Int64("msgID", g.ID()), zap.Error(err)) return nil } if len(unloaded) > 0 { g.fromDataCoord = true g.unloadedPartitionIDs = unloaded log.Debug("some partitions has not been loaded, try get statistics from DataCoord", zap.Int64("msgID", g.ID()), zap.String("collection", g.collectionName), zap.Int64s("unloaded partitions", unloaded), zap.Error(err)) } if len(loaded) > 0 { g.fromQueryNode = true g.loadedPartitionIDs = loaded log.Debug("some partitions has been loaded, try get statistics from QueryNode", zap.Int64("msgID", g.ID()), zap.String("collection", g.collectionName), zap.Int64s("loaded partitions", loaded), zap.Error(err)) } return nil } func (g *getStatisticsTask) Execute(ctx context.Context) error { sp, ctx := trace.StartSpanFromContextWithOperationName(g.TraceCtx(), "Proxy-GetStatistics-Execute") defer sp.Finish() if g.fromQueryNode { // if request get statistics of collection which is full loaded into query node // then we need not pass partition ids params if len(g.request.GetPartitionNames()) == 0 && len(g.unloadedPartitionIDs) == 0 { g.loadedPartitionIDs = []UniqueID{} } err := g.getStatisticsFromQueryNode(ctx) if err != nil { return err } log.Debug("get collection statistics from QueryNode execute done", zap.Int64("msgID", g.ID())) } if g.fromDataCoord { err := g.getStatisticsFromDataCoord(ctx) if err != nil { return err } log.Debug("get collection statistics from DataCoord execute done", zap.Int64("msgID", g.ID())) } return nil } func (g *getStatisticsTask) PostExecute(ctx context.Context) error { sp, _ := trace.StartSpanFromContextWithOperationName(g.TraceCtx(), "Proxy-GetStatistic-PostExecute") defer sp.Finish() tr := timerecord.NewTimeRecorder("getStatisticTask PostExecute") defer func() { tr.Elapse("done") }() if g.fromQueryNode { select { case <-g.TraceCtx().Done(): log.Debug("wait to finish timeout!", zap.Int64("msgID", g.ID())) return nil default: log.Debug("all get statistics are finished or canceled", zap.Int64("msgID", g.ID())) close(g.resultBuf) for res := range g.resultBuf { g.toReduceResults = append(g.toReduceResults, res) log.Debug("proxy receives one get statistic response", zap.Int64("sourceID", res.GetBase().GetSourceID()), zap.Int64("msgID", g.ID())) } } } validResults, err := decodeGetStatisticsResults(g.toReduceResults) if err != nil { return err } result, err := reduceStatisticResponse(validResults) if err != nil { return err } g.result = &milvuspb.GetStatisticsResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Stats: result, } log.Info("get statistics post execute done", zap.Int64("msgID", g.ID()), zap.Any("result", result)) return nil } func (g *getStatisticsTask) getStatisticsFromDataCoord(ctx context.Context) error { collID := g.CollectionID partIDs := g.unloadedPartitionIDs req := &datapb.GetPartitionStatisticsRequest{ Base: commonpbutil.UpdateMsgBase( g.Base, commonpbutil.WithMsgType(commonpb.MsgType_GetPartitionStatistics), ), CollectionID: collID, PartitionIDs: partIDs, } result, err := g.dc.GetPartitionStatistics(ctx, req) if err != nil { return err } if result.Status.ErrorCode != commonpb.ErrorCode_Success { return errors.New(result.Status.Reason) } g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Stats: result.Stats, }) return nil } func (g *getStatisticsTask) getStatisticsFromQueryNode(ctx context.Context) error { g.GetStatisticsRequest.PartitionIDs = g.loadedPartitionIDs executeGetStatistics := func(withCache bool) error { shard2Leaders, err := globalMetaCache.GetShards(ctx, withCache, g.collectionName) if err != nil { return err } g.resultBuf = make(chan *internalpb.GetStatisticsResponse, len(shard2Leaders)) if err := g.statisticShardPolicy(ctx, g.shardMgr, g.getStatisticsShard, shard2Leaders); err != nil { log.Warn("failed to get statistics", zap.Int64("msgID", g.ID()), zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders))) return err } return nil } err := executeGetStatistics(WithCache) if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) { log.Warn("first get statistics failed, updating shard leader caches and retry", zap.Int64("msgID", g.ID()), zap.Error(err)) err = executeGetStatistics(WithoutCache) } if err != nil { return fmt.Errorf("fail to get statistics on all shard leaders, err=%w", err) } return nil } func (g *getStatisticsTask) getStatisticsShard(ctx context.Context, nodeID int64, qn types.QueryNode, channelIDs []string) error { req := &querypb.GetStatisticsRequest{ Req: g.GetStatisticsRequest, DmlChannels: channelIDs, Scope: querypb.DataScope_All, } result, err := qn.GetStatistics(ctx, req) if err != nil { log.Warn("QueryNode statistic return error", zap.Int64("msgID", g.ID()), zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs), zap.Error(err)) return err } if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader { log.Warn("QueryNode is not shardLeader", zap.Int64("msgID", g.ID()), zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs)) return errInvalidShardLeaders } if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("QueryNode statistic result error", zap.Int64("msgID", g.ID()), zap.Int64("nodeID", nodeID), zap.String("reason", result.GetStatus().GetReason())) return fmt.Errorf("fail to get statistic, QueryNode ID=%d, reason=%s", nodeID, result.GetStatus().GetReason()) } g.resultBuf <- result return nil } // checkFullLoaded check if collection / partition was fully loaded into QueryNode // return loaded partitions, unloaded partitions and error func checkFullLoaded(ctx context.Context, qc types.QueryCoord, collectionName string, searchPartitionIDs []UniqueID) ([]UniqueID, []UniqueID, error) { var loadedPartitionIDs []UniqueID var unloadPartitionIDs []UniqueID // TODO: Consider to check if partition loaded from cache to save rpc. info, err := globalMetaCache.GetCollectionInfo(ctx, collectionName) if err != nil { return nil, nil, fmt.Errorf("GetCollectionInfo failed, collection = %s, err = %s", collectionName, err) } // If request to search partitions if len(searchPartitionIDs) > 0 { resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), CollectionID: info.collID, PartitionIDs: searchPartitionIDs, }) if err != nil { return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err) } if resp.Status.ErrorCode != commonpb.ErrorCode_Success { return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason()) } for i, percentage := range resp.GetInMemoryPercentages() { if percentage >= 100 { loadedPartitionIDs = append(loadedPartitionIDs, resp.GetPartitionIDs()[i]) } else { unloadPartitionIDs = append(unloadPartitionIDs, resp.GetPartitionIDs()[i]) } } return loadedPartitionIDs, unloadPartitionIDs, nil } // If request to search collection resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), CollectionID: info.collID, }) if err != nil { return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err) } if resp.Status.ErrorCode != commonpb.ErrorCode_Success { return nil, nil, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason()) } loadedMap := make(map[UniqueID]bool) for i, percentage := range resp.GetInMemoryPercentages() { if percentage >= 100 { loadedMap[resp.GetPartitionIDs()[i]] = true loadedPartitionIDs = append(loadedPartitionIDs, resp.GetPartitionIDs()[i]) } } for _, partInfo := range info.partInfo { if _, ok := loadedMap[partInfo.partitionID]; !ok { unloadPartitionIDs = append(unloadPartitionIDs, partInfo.partitionID) } } return loadedPartitionIDs, unloadPartitionIDs, nil } func decodeGetStatisticsResults(results []*internalpb.GetStatisticsResponse) ([]map[string]string, error) { ret := make([]map[string]string, len(results)) for i, result := range results { if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, fmt.Errorf("fail to decode result, reason=%s", result.GetStatus().GetReason()) } ret[i] = funcutil.KeyValuePair2Map(result.GetStats()) } return ret, nil } func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValuePair, error) { mergedResults := map[string]interface{}{ "row_count": int64(0), } fieldMethod := map[string]func(string) error{ "row_count": func(str string) error { count, err := strconv.ParseInt(str, 10, 64) if err != nil { return err } mergedResults["row_count"] = mergedResults["row_count"].(int64) + count return nil }, } err := funcutil.MapReduce(results, fieldMethod) stringMap := make(map[string]string) for k, v := range mergedResults { stringMap[k] = fmt.Sprint(v) } return funcutil.Map2KeyValuePair(stringMap), err } // implement Task // try to compatible with old API (getCollectionStatistics & getPartitionStatistics) //type getPartitionStatisticsTask struct { // getStatisticsTask // request *milvuspb.GetPartitionStatisticsRequest // result *milvuspb.GetPartitionStatisticsResponse //} // //func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error { // g.getStatisticsTask.DbID = 0 // g.getStatisticsTask.collectionName = g.request.GetCollectionName() // g.getStatisticsTask.partitionNames = []string{g.request.GetPartitionName()} // // g.TravelTimestamp = g.request.GetTravelTimestamp() // // g.GuaranteeTimestamp = g.request.GetGuaranteeTimestamp() // return g.getStatisticsTask.PreExecute(ctx) //} // //func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error { // if g.fromQueryNode { // err := g.getStatisticsTask.Execute(ctx) // if err != nil { // return err // } // log.Debug("get partition statistics from QueryNode execute done", zap.Int64("msgID", g.ID())) // } // if g.fromDataCoord { // collID := g.CollectionID // partIDs := g.unloadedPartitionIDs // // req := &datapb.GetPartitionStatisticsRequest{ // Base: &commonpb.MsgBase{ // MsgType: commonpb.MsgType_GetPartitionStatistics, // MsgID: g.Base.MsgID, // Timestamp: g.Base.Timestamp, // SourceID: g.Base.SourceID, // }, // CollectionID: collID, // PartitionIDs: partIDs, // } // // result, err := g.dc.GetPartitionStatistics(ctx, req) // if err != nil { // return err // } // if result.Status.ErrorCode != commonpb.ErrorCode_Success { // return errors.New(result.Status.Reason) // } // g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ // Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, // Stats: result.Stats, // }) // log.Debug("get partition statistics from DataCoord execute done", zap.Int64("msgID", g.ID())) // return nil // } // return nil //} // //func (g *getPartitionStatisticsTask) PostExecute(ctx context.Context) error { // err := g.getStatisticsTask.PostExecute(ctx) // if err != nil { // return err // } // g.result = &milvuspb.GetPartitionStatisticsResponse{ // Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, // Stats: g.innerResult, // } // return nil //} // //type getCollectionStatisticsTask struct { // getStatisticsTask // request *milvuspb.GetCollectionStatisticsRequest // result *milvuspb.GetCollectionStatisticsResponse //} // //func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error { // g.getStatisticsTask.DbID = 0 // g.getStatisticsTask.collectionName = g.request.GetCollectionName() // g.getStatisticsTask.partitionNames = []string{} // // g.TravelTimestamp = g.request.GetTravelTimestamp() // // g.GuaranteeTimestamp = g.request.GetGuaranteeTimestamp() // return g.getStatisticsTask.PreExecute(ctx) //} // //func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error { // if g.fromQueryNode { // // if you get entire collection, we need to pass partition ids param. // if len(g.unloadedPartitionIDs) == 0 { // g.GetStatisticsRequest.PartitionIDs = nil // } // err := g.getStatisticsTask.Execute(ctx) // if err != nil { // return err // } // log.Debug("get collection statistics from QueryNode execute done", zap.Int64("msgID", g.ID())) // } // if g.fromDataCoord { // collID := g.CollectionID // partIDs := g.unloadedPartitionIDs // // // all collection has not been loaded, get statistics from datacoord // if len(g.GetStatisticsRequest.PartitionIDs) == 0 { // req := &datapb.GetCollectionStatisticsRequest{ // Base: &commonpb.MsgBase{ // MsgType: commonpb.MsgType_GetCollectionStatistics, // MsgID: g.Base.MsgID, // Timestamp: g.Base.Timestamp, // SourceID: g.Base.SourceID, // }, // CollectionID: collID, // } // // result, err := g.dc.GetCollectionStatistics(ctx, req) // if err != nil { // return err // } // if result.Status.ErrorCode != commonpb.ErrorCode_Success { // return errors.New(result.Status.Reason) // } // g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ // Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, // Stats: result.Stats, // }) // } else { // some partitions have been loaded, get some partition statistics from datacoord // req := &datapb.GetPartitionStatisticsRequest{ // Base: &commonpb.MsgBase{ // MsgType: commonpb.MsgType_GetPartitionStatistics, // MsgID: g.Base.MsgID, // Timestamp: g.Base.Timestamp, // SourceID: g.Base.SourceID, // }, // CollectionID: collID, // PartitionIDs: partIDs, // } // // result, err := g.dc.GetPartitionStatistics(ctx, req) // if err != nil { // return err // } // if result.Status.ErrorCode != commonpb.ErrorCode_Success { // return errors.New(result.Status.Reason) // } // g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ // Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, // Stats: result.Stats, // }) // } // log.Debug("get collection statistics from DataCoord execute done", zap.Int64("msgID", g.ID())) // return nil // } // return nil //} // //func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error { // err := g.getStatisticsTask.PostExecute(ctx) // if err != nil { // return err // } // g.result = &milvuspb.GetCollectionStatisticsResponse{ // Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, // Stats: g.innerResult, // } // return nil //} // old version of get statistics // please remove it after getStatisticsTask below is stable type getCollectionStatisticsTask struct { Condition *milvuspb.GetCollectionStatisticsRequest ctx context.Context dataCoord types.DataCoord result *milvuspb.GetCollectionStatisticsResponse collectionID UniqueID } func (g *getCollectionStatisticsTask) TraceCtx() context.Context { return g.ctx } func (g *getCollectionStatisticsTask) ID() UniqueID { return g.Base.MsgID } func (g *getCollectionStatisticsTask) SetID(uid UniqueID) { g.Base.MsgID = uid } func (g *getCollectionStatisticsTask) Name() string { return GetCollectionStatisticsTaskName } func (g *getCollectionStatisticsTask) Type() commonpb.MsgType { return g.Base.MsgType } func (g *getCollectionStatisticsTask) BeginTs() Timestamp { return g.Base.Timestamp } func (g *getCollectionStatisticsTask) EndTs() Timestamp { return g.Base.Timestamp } func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) { g.Base.Timestamp = ts } func (g *getCollectionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() return nil } func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error { g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, g.CollectionName) if err != nil { return err } g.collectionID = collID req := &datapb.GetCollectionStatisticsRequest{ Base: commonpbutil.UpdateMsgBase( g.Base, commonpbutil.WithMsgType(commonpb.MsgType_GetCollectionStatistics), ), CollectionID: collID, } result, err := g.dataCoord.GetCollectionStatistics(ctx, req) if err != nil { return err } if result.Status.ErrorCode != commonpb.ErrorCode_Success { return errors.New(result.Status.Reason) } g.result = &milvuspb.GetCollectionStatisticsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, Stats: result.Stats, } return nil } func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error { return nil } type getPartitionStatisticsTask struct { Condition *milvuspb.GetPartitionStatisticsRequest ctx context.Context dataCoord types.DataCoord result *milvuspb.GetPartitionStatisticsResponse collectionID UniqueID } func (g *getPartitionStatisticsTask) TraceCtx() context.Context { return g.ctx } func (g *getPartitionStatisticsTask) ID() UniqueID { return g.Base.MsgID } func (g *getPartitionStatisticsTask) SetID(uid UniqueID) { g.Base.MsgID = uid } func (g *getPartitionStatisticsTask) Name() string { return GetPartitionStatisticsTaskName } func (g *getPartitionStatisticsTask) Type() commonpb.MsgType { return g.Base.MsgType } func (g *getPartitionStatisticsTask) BeginTs() Timestamp { return g.Base.Timestamp } func (g *getPartitionStatisticsTask) EndTs() Timestamp { return g.Base.Timestamp } func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) { g.Base.Timestamp = ts } func (g *getPartitionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() return nil } func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error { g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, g.CollectionName) if err != nil { return err } g.collectionID = collID partitionID, err := globalMetaCache.GetPartitionID(ctx, g.CollectionName, g.PartitionName) if err != nil { return err } req := &datapb.GetPartitionStatisticsRequest{ Base: commonpbutil.UpdateMsgBase( g.Base, commonpbutil.WithMsgType(commonpb.MsgType_GetCollectionStatistics), ), CollectionID: collID, PartitionIDs: []int64{partitionID}, } result, _ := g.dataCoord.GetPartitionStatistics(ctx, req) if result == nil { return errors.New("get partition statistics resp is nil") } if result.Status.ErrorCode != commonpb.ErrorCode_Success { return errors.New(result.Status.Reason) } g.result = &milvuspb.GetPartitionStatisticsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, Stats: result.Stats, } return nil } func (g *getPartitionStatisticsTask) PostExecute(ctx context.Context) error { return nil }