gf/os/gcron/gcron_entry.go

197 lines
5.8 KiB
Go
Raw Normal View History

2021-01-17 21:46:25 +08:00
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gcron
2018-12-30 11:08:07 +08:00
import (
2021-09-28 19:04:36 +08:00
"context"
"fmt"
"reflect"
"runtime"
"time"
2019-07-29 21:01:19 +08:00
2021-10-11 21:41:56 +08:00
"github.com/gogf/gf/v2/container/gtype"
2021-11-15 20:26:31 +08:00
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
2021-10-11 21:41:56 +08:00
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/gconv"
2018-12-30 11:08:07 +08:00
)
// JobFunc is the timing called job function in cron.
type JobFunc = gtimer.JobFunc
// Entry is timing task entry.
type Entry struct {
2021-08-20 11:52:22 +08:00
cron *Cron // Cron object belonged to.
timerEntry *gtimer.Entry // Associated timer Entry.
schedule *cronSchedule // Timed schedule object.
jobName string // Callback function name(address info).
times *gtype.Int // Running times limit.
infinite *gtype.Bool // No times limit.
Name string // Entry name.
Job JobFunc `json:"-"` // Callback function.
2021-08-20 11:52:22 +08:00
Time time.Time // Registered time.
2018-12-30 11:08:07 +08:00
}
type doAddEntryInput struct {
Name string // Name names this entry for manual control.
Job JobFunc // Job is the callback function for timed task execution.
Ctx context.Context // The context for the job.
Times int // Times specifies the running limit times for the entry.
Pattern string // Pattern is the crontab style string for scheduler.
IsSingleton bool // Singleton specifies whether timed task executing in singleton mode.
Infinite bool // Infinite specifies whether this entry is running with no times limit.
2021-08-20 11:52:22 +08:00
}
// doAddEntry creates and returns a new Entry object.
func (c *Cron) doAddEntry(in doAddEntryInput) (*Entry, error) {
2021-08-20 11:52:22 +08:00
if in.Name != "" {
if c.Search(in.Name) != nil {
return nil, gerror.NewCodef(gcode.CodeInvalidOperation, `cron job "%s" already exists`, in.Name)
2021-08-20 11:52:22 +08:00
}
}
schedule, err := newSchedule(in.Pattern)
2019-06-19 09:06:52 +08:00
if err != nil {
return nil, err
}
2021-08-20 11:52:22 +08:00
// No limit for `times`, for timer checking scheduling every second.
entry := &Entry{
2019-06-19 09:06:52 +08:00
cron: c,
schedule: schedule,
2021-08-20 11:52:22 +08:00
jobName: runtime.FuncForPC(reflect.ValueOf(in.Job).Pointer()).Name(),
times: gtype.NewInt(in.Times),
infinite: gtype.NewBool(in.Infinite),
Job: in.Job,
2019-06-19 09:06:52 +08:00
Time: time.Now(),
}
2021-08-20 11:52:22 +08:00
if in.Name != "" {
entry.Name = in.Name
2019-06-19 09:06:52 +08:00
} else {
2021-08-20 11:52:22 +08:00
entry.Name = "cron-" + gconv.String(c.idGen.Add(1))
2019-06-19 09:06:52 +08:00
}
// When you add a scheduled task, you cannot allow it to run.
2021-08-20 11:52:22 +08:00
// It cannot start running when added to timer.
// It should start running after the entry is added to the Cron entries map, to avoid the task
// from running during adding where the entries do not have the entry information, which might cause panic.
entry.timerEntry = gtimer.AddEntry(
in.Ctx,
time.Second,
2022-05-09 21:45:57 +08:00
entry.checkAndRun,
in.IsSingleton,
-1,
gtimer.StatusStopped,
)
2019-06-19 09:06:52 +08:00
c.entries.Set(entry.Name, entry)
2021-08-20 11:52:22 +08:00
entry.timerEntry.Start()
2019-06-19 09:06:52 +08:00
return entry, nil
2018-12-30 11:08:07 +08:00
}
// IsSingleton return whether this entry is a singleton timed task.
func (entry *Entry) IsSingleton() bool {
2021-08-20 11:52:22 +08:00
return entry.timerEntry.IsSingleton()
2019-01-16 22:34:22 +08:00
}
// SetSingleton sets the entry running in singleton mode.
func (entry *Entry) SetSingleton(enabled bool) {
2021-08-20 11:52:22 +08:00
entry.timerEntry.SetSingleton(enabled)
2019-01-16 22:34:22 +08:00
}
// SetTimes sets the times which the entry can run.
func (entry *Entry) SetTimes(times int) {
2019-06-19 09:06:52 +08:00
entry.times.Set(times)
2021-08-20 11:52:22 +08:00
entry.infinite.Set(false)
2018-12-30 11:08:07 +08:00
}
// Status returns the status of entry.
func (entry *Entry) Status() int {
2021-08-20 11:52:22 +08:00
return entry.timerEntry.Status()
2018-12-30 11:08:07 +08:00
}
// SetStatus sets the status of the entry.
func (entry *Entry) SetStatus(status int) int {
2021-08-20 11:52:22 +08:00
return entry.timerEntry.SetStatus(status)
2019-01-16 22:34:22 +08:00
}
// Start starts running the entry.
func (entry *Entry) Start() {
2021-08-20 11:52:22 +08:00
entry.timerEntry.Start()
}
// Stop stops running the entry.
func (entry *Entry) Stop() {
2021-08-20 11:52:22 +08:00
entry.timerEntry.Stop()
}
2019-01-16 22:34:22 +08:00
// Close stops and removes the entry from cron.
func (entry *Entry) Close() {
2019-06-19 09:06:52 +08:00
entry.cron.entries.Remove(entry.Name)
2021-08-20 11:52:22 +08:00
entry.timerEntry.Close()
2019-01-21 22:09:51 +08:00
}
2022-05-09 21:45:57 +08:00
// checkAndRun is the core timing task check logic.
// The running times limits feature is implemented by gcron.Entry and cannot be implemented by gtimer.Entry.
// gcron.Entry relies on gtimer to implement a scheduled task check for gcron.Entry per second.
2022-05-09 21:45:57 +08:00
func (entry *Entry) checkAndRun(ctx context.Context) {
2019-06-19 09:06:52 +08:00
if entry.schedule.meet(time.Now()) {
switch entry.cron.status.Val() {
2020-12-14 13:26:48 +08:00
case StatusStopped:
2019-06-19 09:06:52 +08:00
return
2020-12-14 13:26:48 +08:00
case StatusClosed:
entry.logDebugf(ctx, `cron job "%s" is removed`, entry.getJobNameWithPattern())
2019-06-19 09:06:52 +08:00
entry.Close()
2020-12-14 13:26:48 +08:00
case StatusReady:
2019-06-19 09:06:52 +08:00
fallthrough
2020-12-14 13:26:48 +08:00
case StatusRunning:
2021-08-08 13:56:26 +08:00
defer func() {
if exception := recover(); exception != nil {
entry.logErrorf(ctx,
`cron job "%s(%s)" end with error: %+v`,
entry.jobName, entry.schedule.pattern, exception,
)
2021-08-08 13:56:26 +08:00
} else {
entry.logDebugf(ctx, `cron job "%s" ends`, entry.getJobNameWithPattern())
2021-08-08 13:56:26 +08:00
}
2021-08-20 11:52:22 +08:00
if entry.timerEntry.Status() == StatusClosed {
2021-08-08 13:56:26 +08:00
entry.Close()
}
}()
2019-06-19 09:06:52 +08:00
// Running times check.
2021-08-20 11:52:22 +08:00
if !entry.infinite.Val() {
times := entry.times.Add(-1)
if times <= 0 {
if entry.timerEntry.SetStatus(StatusClosed) == StatusClosed || times < 0 {
return
}
2019-06-19 09:06:52 +08:00
}
}
entry.logDebugf(ctx, `cron job "%s" starts`, entry.getJobNameWithPattern())
2021-08-08 13:56:26 +08:00
2022-01-20 15:53:16 +08:00
entry.Job(ctx)
2019-06-19 09:06:52 +08:00
}
}
2019-01-16 22:34:22 +08:00
}
2022-01-20 15:53:16 +08:00
func (entry *Entry) getJobNameWithPattern() string {
return fmt.Sprintf(`%s(%s)`, entry.jobName, entry.schedule.pattern)
}
2021-09-28 19:04:36 +08:00
func (entry *Entry) logDebugf(ctx context.Context, format string, v ...interface{}) {
2021-08-20 11:52:22 +08:00
if logger := entry.cron.GetLogger(); logger != nil {
2021-09-28 19:04:36 +08:00
logger.Debugf(ctx, format, v...)
2021-08-20 11:52:22 +08:00
}
}
2021-09-28 19:04:36 +08:00
func (entry *Entry) logErrorf(ctx context.Context, format string, v ...interface{}) {
2021-08-20 11:52:22 +08:00
if logger := entry.cron.GetLogger(); logger != nil {
2021-09-28 19:04:36 +08:00
logger.Errorf(ctx, format, v...)
2021-08-20 11:52:22 +08:00
}
}