milvus/internal/proxy/task_policies.go
congqixia 69252f812d
Implement memory replica in Proxy, QueryNode and QueryCoord (#16470)
Related to #16298 #16291 #16154
Co-authored-by: sunby <bingyi.sun@zilliz.com>
Co-authored-by: yangxuan <xuan.yang@zilliz.com>
Co-authored-by: yah01 <yang.cen@zilliz.com>
Co-authored-by: Letian Jiang <letian.jiang@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2022-04-20 16:15:41 +08:00

82 lines
2.2 KiB
Go

package proxy
import (
"context"
"errors"
"fmt"
qnClient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
)
type getQueryNodePolicy func(context.Context, string) (types.QueryNode, error)
type pickShardPolicy func(ctx context.Context, policy getQueryNodePolicy, query func(UniqueID, types.QueryNode) error, leaders *querypb.ShardLeadersList) error
// TODO add another policy to enbale the use of cache
// defaultGetQueryNodePolicy creates QueryNode client for every address everytime
func defaultGetQueryNodePolicy(ctx context.Context, address string) (types.QueryNode, error) {
qn, err := qnClient.NewClient(ctx, address)
if err != nil {
return nil, err
}
if err := qn.Init(); err != nil {
return nil, err
}
if err := qn.Start(); err != nil {
return nil, err
}
return qn, nil
}
var (
errBegin = errors.New("begin error")
errInvalidShardLeaders = errors.New("Invalid shard leader")
)
func roundRobinPolicy(ctx context.Context, getQueryNodePolicy getQueryNodePolicy, query func(UniqueID, types.QueryNode) error, leaders *querypb.ShardLeadersList) error {
var (
err = errBegin
current = 0
qn types.QueryNode
)
replicaNum := len(leaders.GetNodeIds())
for err != nil && current < replicaNum {
currentID := leaders.GetNodeIds()[current]
if err != errBegin {
log.Warn("retry with another QueryNode", zap.String("leader", leaders.GetChannelName()), zap.Int64("nodeID", currentID))
}
qn, err = getQueryNodePolicy(ctx, leaders.GetNodeAddrs()[current])
if err != nil {
log.Warn("fail to get valid QueryNode", zap.Int64("nodeID", currentID),
zap.Error(err))
current++
continue
}
defer qn.Stop()
err = query(currentID, qn)
if err != nil {
log.Warn("fail to Query with shard leader",
zap.String("leader", leaders.GetChannelName()),
zap.Int64("nodeID", currentID),
zap.Error(err))
}
current++
}
if current == replicaNum && err != nil {
return fmt.Errorf("no shard leaders available for channel: %s, leaders: %v, err: %s", leaders.GetChannelName(), leaders.GetNodeIds(), err.Error())
}
return nil
}