feat: add scheduler

parent 55b0b00c
......@@ -148,6 +148,7 @@
"passport-twitch-strategy": "2.2.0",
"pem-jwk": "2.0.0",
"pg": "8.8.0",
"pg-boss": "8.0.0",
"pg-hstore": "2.3.4",
"pg-pubsub": "0.8.0",
"pg-query-stream": "4.2.4",
......
......@@ -75,7 +75,7 @@ module.exports = {
await WIKI.models.commentProviders.initProvider()
await WIKI.models.sites.reloadCache()
await WIKI.models.storage.initTargets()
// WIKI.scheduler.start()
WIKI.scheduler.start()
await WIKI.models.subscribeToNotifications()
},
......
......@@ -4,8 +4,7 @@ const _ = require('lodash')
const moment = require('moment')
const CSR = require('@root/csr')
const PEM = require('@root/pem')
// eslint-disable-next-line node/no-deprecated-api
const punycode = require('punycode')
const punycode = require('punycode/')
/* global WIKI */
......
const moment = require('moment')
const childProcess = require('child_process')
const _ = require('lodash')
const configHelper = require('../helpers/config')
const PgBoss = require('pg-boss')
/* global WIKI */
class Job {
constructor({
name,
immediate = false,
schedule = 'P1D',
repeat = false,
worker = false
}, queue) {
this.queue = queue
this.finished = Promise.resolve()
this.name = name
this.immediate = immediate
this.schedule = moment.duration(schedule)
this.repeat = repeat
this.worker = worker
}
/**
* Start Job
*
* @param {Object} data Job Data
*/
start(data) {
this.queue.jobs.push(this)
if (this.immediate) {
this.invoke(data)
} else {
this.enqueue(data)
}
}
/**
* Queue the next job run according to the wait duration
*
* @param {Object} data Job Data
*/
enqueue(data) {
this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
}
/**
* Run the actual job
*
* @param {Object} data Job Data
*/
async invoke(data) {
try {
if (this.worker) {
const proc = childProcess.fork(`server/core/worker.js`, [
`--job=${this.name}`,
`--data=${data}`
], {
cwd: WIKI.ROOTPATH,
stdio: ['inherit', 'inherit', 'pipe', 'ipc']
})
const stderr = [];
proc.stderr.on('data', chunk => stderr.push(chunk))
this.finished = new Promise((resolve, reject) => {
proc.on('exit', (code, signal) => {
const data = Buffer.concat(stderr).toString()
if (code === 0) {
resolve(data)
} else {
const err = new Error(`Error when running job ${this.name}: ${data}`)
err.exitSignal = signal
err.exitCode = code
err.stderr = data
reject(err)
}
proc.kill()
})
})
} else {
this.finished = require(`../jobs/${this.name}`)(data)
}
await this.finished
} catch (err) {
WIKI.logger.warn(err)
}
if (this.repeat && this.queue.jobs.includes(this)) {
this.enqueue(data)
} else {
this.stop().catch(() => {})
}
}
/**
* Stop any future job invocation from occuring
*/
async stop() {
clearTimeout(this.timeout)
this.queue.jobs = this.queue.jobs.filter(x => x !== this)
return this.finished
}
}
module.exports = {
scheduler: null,
jobs: [],
init() {
return this
},
start() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
if (WIKI.config.offline && queueParams.offlineSkip) {
WIKI.logger.warn(`Skipping job ${queueName} because offline mode is enabled. [SKIPPED]`)
return
}
const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : 'P1D'
this.registerJob({
name: _.kebabCase(queueName),
immediate: _.get(queueParams, 'onInit', false),
schedule: schedule,
repeat: _.get(queueParams, 'repeat', false),
worker: _.get(queueParams, 'worker', false)
})
init () {
WIKI.logger.info('Initializing Scheduler...')
this.scheduler = new PgBoss({
...WIKI.models.knex.client.connectionSettings,
application_name: 'Wiki.js Scheduler',
schema: WIKI.config.db.schemas.scheduler,
uuid: 'v4'
})
return this
},
registerJob(opts, data) {
const job = new Job(opts, this)
job.start(data)
return job
async start () {
WIKI.logger.info('Starting Scheduler...')
await this.scheduler.start()
WIKI.logger.info('Scheduler: [ STARTED ]')
},
async stop() {
return Promise.all(this.jobs.map(job => job.stop()))
async stop () {
WIKI.logger.info('Stopping Scheduler...')
await this.scheduler.stop()
WIKI.logger.info('Scheduler: [ STOPPED ]')
}
}
const path = require('path')
let WIKI = {
IS_DEBUG: process.env.NODE_ENV === 'development',
ROOTPATH: process.cwd(),
SERVERPATH: path.join(process.cwd(), 'server'),
Error: require('../helpers/error'),
configSvc: require('./config')
}
global.WIKI = WIKI
WIKI.configSvc.init()
WIKI.logger = require('./logger').init('JOB')
const args = require('yargs').argv
;(async () => {
try {
await require(`../jobs/${args.job}`)(args.data)
process.exit(0)
} catch (e) {
await new Promise(resolve => process.stderr.write(e.message, resolve))
process.exit(1)
}
})()
......@@ -77,6 +77,7 @@ useMeta({
> img {
height: 200px;
user-select: none;
}
}
......
This diff was suppressed by a .gitattributes entry.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment