Revert "Consume threads of the number of nq (#24410)" (#24554)

This reverts commit 71a7fef5c5.

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-05-31 16:41:33 +08:00 committed by GitHub
parent 5358da2a10
commit 1c64d0618c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 1 additions and 26 deletions

View File

@ -3,7 +3,6 @@ package tasks
import (
"context"
"fmt"
"sync"
"go.uber.org/atomic"
@ -26,9 +25,7 @@ type Scheduler struct {
queryProcessQueue chan *QueryTask
queryWaitQueue chan *QueryTask
pool *conc.Pool[any]
runningThreadNum int
cond *sync.Cond
pool *conc.Pool[any]
}
func NewScheduler() *Scheduler {
@ -42,7 +39,6 @@ func NewScheduler() *Scheduler {
// queryProcessQueue: make(chan),
pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)),
cond: sync.NewCond(&sync.Mutex{}),
}
}
@ -155,23 +151,7 @@ func (s *Scheduler) processAll(ctx context.Context) {
}
func (s *Scheduler) process(t Task) {
s.cond.L.Lock()
for s.runningThreadNum >= s.pool.Cap() {
s.cond.Wait()
}
s.runningThreadNum += t.Weight()
s.cond.L.Unlock()
s.pool.Submit(func() (any, error) {
defer func() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
s.runningThreadNum -= t.Weight()
if s.runningThreadNum < s.pool.Cap() {
s.cond.Broadcast()
}
}()
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
err := t.Execute()

View File

@ -25,7 +25,6 @@ type Task interface {
Done(err error)
Canceled() error
Wait() error
Weight() int
}
type SearchTask struct {
@ -236,10 +235,6 @@ func (t *SearchTask) Wait() error {
return <-t.notifier
}
func (t *SearchTask) Weight() int {
return int(t.nq)
}
func (t *SearchTask) Result() *internalpb.SearchResults {
return t.result
}