timecloud-core
A distributed scheduling system based on Node.js
Features
- 轻量级内核
- 集群部署, 防止单点故障
- MongoDB持久化存储
- 任务优先级排列
- 任务监控,进程控制
Installation
通过NPM安装
npm install timecloud-core
timecloud-core简介
timecloud-core是一个定时任务管理模块的轻量级内核,它将node中的定时任务存储在数据库中,通过事件回调与监听实现定时任务调度。
使用步骤概述
总的来说分4步:
- 初始化(
new Core(..)
) - 定义任务(
core.define(..)
) - 配置任务(
core.every(..)
) - 最后设置事件监听(
core.on(..)
const Core = const connection_options = db: address: 'mongodb://127.0.0.1:27017/timecloud' collection: 'timecloudJobs' options: server: auto_reconnect: true // 初始化内核let core = connection_optionscore name'TIMECLOUD TEST - ' + processpid // 定义任务core// 配置任务(需要在ready事件中完成)core// 设置监听core core core core// 最后,优雅的退出方案 { core;} process;process;
步骤详述
初始化
初始化有两种方案:
- 通过完整的参数传递
- 链式初始化(推荐使用)
部分说明:
.database(url, [collectionName])
连接数据库,例如core.database('mongodb://127.0.0.1:27017/core', 'timecloudJobs')
,其中collectionName
可省略,默认值为timecloudJobs
.name(string)
jobs存储于数据库中会有lastModifiedBy
字段,存储调度这个任务的timecloud-core名字.processEvery(interval)
用于设置timecloud-core轮询数据库检测是否有新的任务加入的扫描时间.defaultLockLimit(number)
设置默认值,timecloud-core调度任务时会锁定该任务,防止其他timecloud-core重复执行任务,当任务超出lockLifetime或调用done时会解锁任务,define新任务时如果设置.lockLimit(number)
则会覆盖默认值。该值用于设置锁定任务的数量,默认为0,即无限制.defaultLockLifetime(number)
上面说了,当任务被调度时会被锁定,当任务调用完毕执行done()
后会解锁,或者执行的时间很长,超出了lockLifetime也会解锁,同样,define其他任务时如果设置了lockLifetime参数,会覆盖掉默认值
定义任务
core.define(jobName, [options], fn)
core
参数说明:
jobName
类型String
,用于存储于数据库中的name
字段,同时配置任务和设置监听会使用到该参数
options
可配置concurrency
、lockLimit
、lockLifetime
、priority
,前文都已说过,
priority
为任务优先级:lowest|low|normal|high|highest|number
,前五个是字符串,传入后会被转为数字,依次对应:-20|-10|0|10|20
fn(job, done)
:
- 配置任务时,会设置每次执行任务需要传入的参数,该参数都被放在了
job.attrs.data
中, - 任务出错,会调用
job.fail(reason)
方法,其中的reason
可以是字符串或Error
,当调用此方法时,会设置job.attrs.failedAt
为当前时间(ISO),job.attrs.failReason
为reason
或Error.message
- 对于异步任务,必须调用
done
方法告知任务结束 - 对于同步任务,省略为
fn(job)
即可
配置任务
core.every(interval, name, [data], [options], [cb])
corecorecore
参数说明
interval
可以是字符串,或cron时间,cron时间格式为
* * * * * *
| | | | | |
| | | | | +-- Day of the Week (range: 0-6, 0 standing for Monday)
| | | | +---- Month (range: 0-11)
| | | +------ Day of the Month (range: 1-31)
| | +-------- Hour (range: 0-23)
| +---------- Minute (range: 0-59)
+------------ Second (range: 0-59)
设置每隔几分钟的任务用'5 minutes'
就好了,但是如果设置每天的什么时候运行,就要使用cron设置,且要注意时区(在options中设置)!!
name
就是任务的名字,一定要对应,可以是字符串数组,这样的话多个任务就使用同一个配置
data
是一个Object
,在define
任务时通过job.attrs.data
获取该对象
options
注意,要设置options
必须先设置data
,所以data可以传空Object{}
,在options
中可以设置的有timezone
,必须是可识别的时区字符串,默认为'America/New_York
,而在我们的时区,必须要设置对时区,才能正常运行定时任务!我们的时区字符串是Asia/Shanghai
cb
当job
成功被加入数据库时会调用回调函数
设置监听
core core
可以监听的事件有:
'ready', function()
'start', function(job)
'start:job name', function(job)
'complete', function(job)
'complete:job name', function(job)
'success', function(job)
'success:job name', function(job)
'fail', function(err, job)
'fail: job name', function(err, job)
其中比较重要的事件就是ready
了,当timecloud-core初始化完毕成功连接数据库后,会发出ready
事件,此时就要通过监听该事件,在ready
时间中对define
过的任务配置时间等参数,最后调用core.start()
启动core
我们发现,在ready
事件中,我们还调用了core.purge(cb)
,这个函数是清理数据库中的旧任务(即这此运行程序没有被define到的job)
注意事项
- timecloud-core在mongoDB中的存储方式:
{
"_id" : ObjectId("5981983d9b85994179fc98c9"),
"name" : "updateCampaignTimeout",
"type" : "single",
"data" : {
},
"priority" : 10,
"repeatInterval" : "00 09 18 * * *",
"repeatTimezone" : "Asia/Shanghai",
"lastModifiedBy" : "TIMECLOUD TEST - 97807",
"nextRunAt" : ISODate("2017-08-03T10:09:00.011Z"),
"lockedAt" : null,
"lastRunAt" : ISODate("2017-08-02T10:09:00.011Z"),
"lastFinishedAt" : ISODate("2017-08-02T10:09:00.076Z"),
"failReason" : "failed to calculate nextRunAt due to invalid repeat interval",
"failCount" : 1,
"failedAt" : ISODate("2017-08-02T09:44:54.551Z")
}
-
重复启动timecloud-core时,会更新对应的项,而不是覆盖,只要任务名相同,就不会在数据库中创建新的数据项,也不会删除旧的job(该次执行程序没有define到的job),除非在ready事件中调用
core.purge
-
在配置任务(
core.every
)时一定要注意时区的设置 -
core.every
配置失败时(传入的interval不对等原因),会返回undefined
,正常情况下返回job
对象 -
关于job对象:
Job core: EventEmitter _name: 'TIMECLOUD TEST - 97807' _processEvery: 5000 _defaultConcurrency: 5 _maxConcurrency: 20 _defaultLockLimit: 0 _lockLimit: 0 _definitions: updateCampaignTimeout: Object updateCampaignOverBudget: Object synchronizeBudget: Object _runningJobs: _lockedJobs: _jobQueue: _defaultLockLifetime: 10000 _isLockingOnTheFly: false _jobsToLock: _events: ready: Function start: Function complete: Function success: Function fail: Function _eventsCount: 5 _mdb: Db domain: null _events: {} _eventsCount: 0 _maxListeners: undefined s: Object serverConfig: Getter bufferMaxEntries: Getter databaseName: Getter _collection: Collection s: Object attrs: name: 'updateCampaignTimeout' data: {} type: 'single' priority: 10 nextRunAt: moment repeatInterval: '00 09 18 * * *' repeatTimezone: 'Asia/Shanghai'
Example Usage
let mongoConnectionString = 'mongodb://127.0.0.1/timecloud'; let core = db: address: mongoConnectionString; // 手动指定MongoDB Collection名称:// let core = new Core({db: {address: mongoConnectionString, collection: 'jobCollectionName'}}); // 手动设定MongoDB Collection选项:// let core = new Core({db: {address: mongoConnectionString, collection: 'jobCollectionName', options: {ssl: true}}}); // 手动传入初始化好的MongoClient// let core = new Core({mongo: myMongoClient}); core; core;
core; core;
core;