milvus/internal/querynodev2/tasks/tasks.go
Bingyi Sun fecd9c21ba
feat: LRU cache implementation (#32567)
issue: https://github.com/milvus-io/milvus/issues/32783
This pr is the implementation of lru cache on branch lru-dev.

Signed-off-by: sunby <sunbingyi1992@gmail.com>
Co-authored-by: chyezh <chyezh@outlook.com>
Co-authored-by: MrPresent-Han <chun.han@zilliz.com>
Co-authored-by: Ted Xu <ted.xu@zilliz.com>
Co-authored-by: jaime <yun.zhang@zilliz.com>
Co-authored-by: wayblink <anyang.wang@zilliz.com>
2024-05-06 20:29:30 +08:00

112 lines
2.5 KiB
Go

package tasks
import "github.com/milvus-io/milvus/internal/proto/internalpb"
const (
schedulePolicyNameFIFO = "fifo"
schedulePolicyNameUserTaskPolling = "user-task-polling"
)
// NewScheduler create a scheduler by policyName.
func NewScheduler(policyName string) Scheduler {
switch policyName {
case "":
fallthrough
case schedulePolicyNameFIFO:
return newScheduler(
newFIFOPolicy(),
)
case schedulePolicyNameUserTaskPolling:
return newScheduler(
newUserTaskPollingPolicy(),
)
default:
panic("invalid schedule task policy")
}
}
// tryIntoMergeTask convert inner task into MergeTask,
// Return nil if inner task is not a MergeTask.
func tryIntoMergeTask(t Task) MergeTask {
if mt, ok := t.(MergeTask); ok {
return mt
}
return nil
}
type Scheduler interface {
// Add a new task into scheduler, follow some constraints.
// 1. It's a non-block operation.
// 2. Error will be returned if scheduler reaches some limit.
// 3. Concurrent safe.
Add(task Task) error
// Start schedule the owned task asynchronously and continuously.
// Shall be called only once
Start()
// Stop make scheduler deny all incoming tasks
// and cleans up all related resources
Stop()
// GetWaitingTaskTotalNQ
GetWaitingTaskTotalNQ() int64
// GetWaitingTaskTotal
GetWaitingTaskTotal() int64
}
// schedulePolicy is the policy of scheduler.
type schedulePolicy interface {
// Push add a new task into scheduler.
// Return the count of new task added (task may be chunked, merged or dropped)
// 0 and an error will be returned if scheduler reaches some limit.
Push(task Task) (int, error)
// Pop get the task next ready to run.
Pop() Task
Len() int
}
// MergeTask is a Task which can be merged with other task
type MergeTask interface {
Task
// MergeWith other task, return true if merge success.
// After success, the task merged should be dropped.
MergeWith(Task) bool
}
// A task is execute unit of scheduler.
type Task interface {
// Return the username which task is belong to.
// Return "" if the task do not contain any user info.
Username() string
// Return whether the task would be running on GPU.
IsGpuIndex() bool
// PreExecute the task, only call once.
PreExecute() error
// Execute the task, only call once.
Execute() error
// Done notify the task finished.
Done(err error)
// Check if the Task is canceled.
// Concurrent safe.
Canceled() error
// Wait for task finish.
// Concurrent safe.
Wait() error
// Return the NQ of task.
NQ() int64
SearchResult() *internalpb.SearchResults
}