golang 中的 cronjob

科技资讯 投稿 12800 0 评论

golang 中的 cronjob

引言

经过筛选,选用了 cron 这个库,它支持 linux cronjob 语法取配置定时任务,还支持@every 10s@hourly 等描述符去配置定时任务,完全满足我们要求,比如下面的例子:

package main

import (
	"fmt"

	"github.com/natefinch/lumberjack"
	"github.com/robfig/cron/v3"
	"github.com/sirupsen/logrus"


type CronLogger struct {
	clog *logrus.Logger
}

func (l *CronLogger Info(msg string, keysAndValues ...interface{} {
	l.clog.WithFields(logrus.Fields{
		"data": keysAndValues,
	}.Info(msg
}

func (l *CronLogger Error(err error, msg string, keysAndValues ...interface{} {
	l.clog.WithFields(logrus.Fields{
		"msg":  msg,
		"data": keysAndValues,
	}.Warn(err.Error(
}

func main( {
	logger := logrus.New(
	_logger := &lumberjack.Logger{
		Filename:   "./test.log",
		MaxSize:    50,
		MaxAge:     15,
		MaxBackups: 5,
	}

	logger.SetOutput(_logger
	logger.SetFormatter(&logrus.JSONFormatter{
		DisableHTMLEscape: true,
	}

	c := cron.New(cron.WithLogger(&CronLogger{
		clog: logger,
	}
	c.AddFunc("*/5 * * * *", func( {
		fmt.Println("你的流量包即将过期了"
	}
	c.AddFunc("*/2 * * * *", func( {
		fmt.Println("你的转码包即将过期了"
	}
	c.Start(

	for {
		select {}
	}
}

使用了 cronjob、并结合了 golang 的 log 组建,输出日志到文件,使用很方便。

类库介绍

扩展性强

此类库扩展性挺强,通过 JobWrapper 去包装一个任务,NewChain(w1, w2, w3.Then(job,相关实现如下:

type JobWrapper func(Job Job
type Chain struct {
	wrappers []JobWrapper
}
func NewChain(c ...JobWrapper Chain {
	return Chain{c}
}
func (c Chain Then(j Job Job {
	for i := range c.wrappers {
		j = c.wrappers[len(c.wrappers-i-1](j
	}
	return j
}

比如当前脚本如果还没有执行完,下次任务时间又到了,就可以通过如下默认提供的 wrapper 去避免继续执行。可以看到最后执行的任务 j.Run( 被包装在了一个函数闭包中,并且根据闭包中的 channel 去判断是否执行,避免重复执行。首次执行的时候,容量为 1 的 channel 中已经有数据了,重复执行时,channel 无数据,默认跳过,等上次任务执行完成后,又像 channel 中写入一条数据,下次 channel 可以读出数据,又可以执行任务了:

func SkipIfStillRunning(j Job Job {
	var ch = make(chan struct{}, 1
	ch <- struct{}{}
	return FuncJob(func( {
		select {
		case v := <-ch:
			defer func( { ch <- v }(
			j.Run(
		default:
			// "skip"
		}
	}
}

主流程

cron 主流程是启动一个协程,里面有双重 for 循环,下面我们来一起分析一下。

定时器

sort.Sort(byTime(c.entries
timer = time.NewTimer(c.entries[0].Next.Sub(now

事件循环

事件循环中,包含了很多事件,比如 添加任务停止移除任务,当 cron 启动之后,这些任务都是异步的。比如添加任务,不会直接将任务信息写入内存中,而是进入事件循环,加入之后,重新计算第一二层循环,避免了正在修改任务信息,又执行任务信息,然后出错的情况。

for {
	select {
	case now = <-timer.C:
		// 执行任务
	case newEntry := <-c.add:
		// 添加任务
	case replyChan := <-c.snapshot:
		// 获取任务信息
	case <-c.stop:
		//  停止任务
	case id := <-c.remove:
		// 移除任务
	}
	break
}

类库改造

在了解了项目的基本情况之后,对项目做了部分改造,方便使用。

打印任务列表信息

SIGUSR1,将任务信息输出到日志:

usrSig := make(chan os.Signal, 1
signal.Notify(usrSig, syscall.SIGUSR1

for {
	select {
	case <-usrSig:
		// 启动单独的协程去打印定时任务执行信息
		continue
	}
	break
}

根据名称移除脚本

目前脚本只能根据脚本 id 去移除要执行的任务,执行过程中,也不能通过命令去移除任务,不是太方便。比如有个脚本马上要执行了,但是该脚本发现问题了,这时候生产环境的话,就需要更新代码,然后重启服务去下线脚本任务,这时候,黄花菜可能都凉了。

usrSig2 := make(chan os.Signal, 1
signal.Notify(usrSig2, syscall.SIGUSR2

......
case <-usrSig2:
	actionByte, err := os.ReadFile("/tmp/cron.action"
	...... //校验命令正确性
	action := strings.Fields(string(actionByte
	switch action[0] {
	case "removeTag":
		timer.Stop(
		now = c.now(
		c.removeEntryByTag(action[1]
		c.logger.Info("removedByTag", "tag", action[1]
	}
......

改造效果

由于原项目已经 2 年多没有个更新过了,就算发起 pr 估计也不会被处理,所以 fork 一份放在了这里 aizuyan/cron 进行改造,下面是改进之后的代码:

package main

import (
	// 加载配置文件

	"fmt"

	"github.com/aizuyan/cron/v3"


func main( {
	c := cron.New(cron.WithLogger(cron.DefaultLogger
	c.AddFuncWithTag("流量包过期", "*/5 * * * *", func( {
		fmt.Println("你的流量包即将过期了"
	}
	c.AddFuncWithTag("转码包过期", "*/2 * * * *", func( {
		fmt.Println("你的转码包即将过期了"
	}
	c.Start(

	for {
		select {}
	}
}

对每个定时任务增加了一个名称标识,当任务启动后,当我们执行 kill -SIGUSR1 <pid> 的时候,会看到 stdout 输出了运行的任务列表信息:

+----+------------+-------------+---------------------+---------------------+
| ID |    TAG     |    SPEC     |        PREV         |        NEXT         |
+----+------------+-------------+---------------------+---------------------+
|  2 | 转码包过期 | */2 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:22:00 |
|  1 | 流量包过期 | */5 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:25:00 |
+----+------------+-------------+---------------------+---------------------+

执行 kill -SIGUSR2 <pid>,移除转码包过期任务,避免了使用 ID 容易出错的问题。

cat /tmp/cron.action 
removeTag 转码包过期
// {"data":["tag","转码包过期"],"level":"info","msg":"removedByTag","time":"2023-04-02T18:32:56+08:00"}

放目前为止,是不是更好用了,基本能满足我们的需求了,也可以自己去再做各种扩展。

编程笔记 » golang 中的 cronjob

赞同 (66) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽