enhance: move the search utilities from querynode into new module (#37531)

issue: #33285

- The search utilities will be shared between query node and streaming
node.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2024-11-11 11:10:27 +08:00 committed by GitHub
parent 5e90f348fc
commit fca946dee1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 26 additions and 22 deletions

View File

@ -39,13 +39,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/reduce"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"

View File

@ -50,16 +50,16 @@ import (
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/pipeline"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/registry"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
@ -113,7 +113,7 @@ type QueryNode struct {
loader segments.Loader
// Search/Query
scheduler tasks.Scheduler
scheduler scheduler.Scheduler
// etcd client
etcdCli *clientv3.Client
@ -339,7 +339,7 @@ func (node *QueryNode) Init() error {
}
schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()
node.scheduler = tasks.NewScheduler(
node.scheduler = scheduler.NewScheduler(
schedulePolicy,
)

View File

@ -34,10 +34,10 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -689,7 +690,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
return resp, nil
}
var task tasks.Task
var task scheduler.Task
if paramtable.Get().QueryNodeCfg.UseStreamComputing.GetAsBool() {
task = tasks.NewStreamingSearchTask(searchCtx, collection, node.manager, req, node.serverID)
} else {

View File

@ -6,10 +6,11 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/streamrpc"
)
var _ Task = &QueryStreamTask{}
var _ scheduler.Task = &QueryStreamTask{}
func NewQueryStreamTask(ctx context.Context,
collection *segments.Collection,

View File

@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -22,7 +23,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ Task = &QueryTask{}
var _ scheduler.Task = &QueryTask{}
func NewQueryTask(ctx context.Context,
collection *segments.Collection,

View File

@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -30,8 +31,8 @@ import (
)
var (
_ Task = &SearchTask{}
_ MergeTask = &SearchTask{}
_ scheduler.Task = &SearchTask{}
_ scheduler.MergeTask = &SearchTask{}
)
type SearchTask struct {
@ -346,7 +347,7 @@ func (t *SearchTask) NQ() int64 {
return t.nq
}
func (t *SearchTask) MergeWith(other Task) bool {
func (t *SearchTask) MergeWith(other scheduler.Task) bool {
switch other := other.(type) {
case *SearchTask:
return t.Merge(other)
@ -416,7 +417,7 @@ func NewStreamingSearchTask(ctx context.Context,
}
}
func (t *StreamingSearchTask) MergeWith(other Task) bool {
func (t *StreamingSearchTask) MergeWith(other scheduler.Task) bool {
return false
}

View File

@ -1,4 +1,4 @@
package tasks
package scheduler
import (
"github.com/milvus-io/milvus/pkg/util/paramtable"

View File

@ -1,4 +1,4 @@
package tasks
package scheduler
import (
"context"

View File

@ -1,4 +1,4 @@
package tasks
package scheduler
import (
"fmt"

View File

@ -1,4 +1,4 @@
package tasks
package scheduler
import (
"container/ring"

View File

@ -1,4 +1,4 @@
package tasks
package scheduler
import (
"fmt"

View File

@ -1,4 +1,4 @@
package tasks
package scheduler
import "github.com/milvus-io/milvus/internal/proto/internalpb"