Unverified Commit 5c6ae08b authored by Nicolas Giard's avatar Nicolas Giard Committed by GitHub

feat: use graphile-worker as scheduler (#5698)

parent 7be5943c
......@@ -13,3 +13,4 @@ globals:
document: false
navigator: false
window: false
WIKI: true
......@@ -124,9 +124,11 @@ dataPath: ./data
bodyParserLimit: 5mb
# ---------------------------------------------------------------------
# Workers Limit
# Scheduler
# ---------------------------------------------------------------------
# Maximum number of workers that can run background cpu-intensive jobs.
# Leave to 'auto' to use CPU cores count as maximum.
workers: auto
scheduler:
# Maximum number of workers to run background cpu-intensive jobs.
# Make sure your PostgreSQL server can handle an extra connection
# for each worker!
workers: 3
......@@ -81,6 +81,7 @@
"filesize": "6.1.0",
"fs-extra": "9.0.1",
"getos": "3.2.1",
"graphile-worker": "0.13.0",
"graphql": "16.3.0",
"graphql-list-fields": "2.0.2",
"graphql-rate-limit-directive": "2.0.2",
......@@ -148,7 +149,6 @@
"passport-twitch-strategy": "2.2.0",
"pem-jwk": "2.0.0",
"pg": "8.8.0",
"pg-boss": "8.0.0",
"pg-hstore": "2.3.4",
"pg-pubsub": "0.8.0",
"pg-query-stream": "4.2.4",
......
......@@ -29,7 +29,8 @@ defaults:
offline: false
dataPath: ./data
bodyParserLimit: 5mb
workers: auto
scheduler:
workers: 3
# DB defaults
api:
isEnabled: false
......@@ -78,27 +79,23 @@ defaults:
maxHits: 100
maintainerEmail: security@requarks.io
jobs:
purgeUploads:
onInit: true
schedule: '*/15 * * * *'
offlineSkip: false
repeat: true
syncGraphLocales:
onInit: true
schedule: '0 0 * * *'
offlineSkip: true
repeat: true
syncGraphUpdates:
onInit: true
schedule: '0 0 * * *'
offlineSkip: true
repeat: true
rebuildTree:
onInit: true
offlineSkip: false
repeat: false
immediate: true
worker: true
- task: background
pattern: '*/15 * * * *'
payload:
name: purgeUploads
data: {}
# - task: simple
# identifier: letest
# pattern: '* * * * *'
# payload:
# name: bob
# data: {}
# - task: simple
# identifier: letest2
# pattern: '* * * * *'
# payload:
# name: bob
# data: {}
groups:
defaultPermissions:
- 'read:pages'
......
......@@ -18,6 +18,7 @@ module.exports = {
Objection,
knex: null,
listener: null,
config: null,
/**
* Initialize DB
*/
......@@ -28,7 +29,7 @@ module.exports = {
// Fetch DB Config
const dbConfig = (!_.isEmpty(process.env.DATABASE_URL)) ? process.env.DATABASE_URL : {
this.config = (!_.isEmpty(process.env.DATABASE_URL)) ? process.env.DATABASE_URL : {
host: WIKI.config.db.host.toString(),
user: WIKI.config.db.user.toString(),
password: WIKI.config.db.pass.toString(),
......@@ -40,7 +41,7 @@ module.exports = {
let dbUseSSL = (WIKI.config.db.ssl === true || WIKI.config.db.ssl === 'true' || WIKI.config.db.ssl === 1 || WIKI.config.db.ssl === '1')
let sslOptions = null
if (dbUseSSL && _.isPlainObject(dbConfig) && _.get(WIKI.config.db, 'sslOptions.auto', null) === false) {
if (dbUseSSL && _.isPlainObject(this.config) && _.get(WIKI.config.db, 'sslOptions.auto', null) === false) {
sslOptions = WIKI.config.db.sslOptions
sslOptions.rejectUnauthorized = sslOptions.rejectUnauthorized !== false
if (sslOptions.ca && sslOptions.ca.indexOf('-----') !== 0) {
......@@ -73,8 +74,8 @@ module.exports = {
}
}
if (dbUseSSL && _.isPlainObject(dbConfig)) {
dbConfig.ssl = (sslOptions === true) ? { rejectUnauthorized: true } : sslOptions
if (dbUseSSL && _.isPlainObject(this.config)) {
this.config.ssl = (sslOptions === true) ? { rejectUnauthorized: true } : sslOptions
}
// Initialize Knex
......@@ -82,7 +83,7 @@ module.exports = {
client: 'pg',
useNullAsDefault: true,
asyncStackTraces: WIKI.IS_DEBUG,
connection: dbConfig,
connection: this.config,
searchPath: [WIKI.config.db.schemas.wiki],
pool: {
...WIKI.config.pool,
......
......@@ -77,7 +77,6 @@ module.exports = {
await WIKI.models.sites.reloadCache()
await WIKI.models.storage.initTargets()
await WIKI.scheduler.start()
await WIKI.scheduler.registerScheduledJobs()
await WIKI.models.subscribeToNotifications()
},
......
const PgBoss = require('pg-boss')
const { run, parseCronItems, Logger } = require('graphile-worker')
const { Pool } = require('pg')
const { DynamicThreadPool } = require('poolifier')
const { v4: uuid } = require('uuid')
const os = require('node:os')
/* global WIKI */
const path = require('node:path')
module.exports = {
pool: null,
boss: null,
runner: null,
maxWorkers: 1,
async init () {
WIKI.logger.info('Initializing Scheduler...')
this.boss = new PgBoss({
db: {
close: () => Promise.resolve('ok'),
executeSql: async (text, values) => {
try {
const resource = await WIKI.models.knex.client.pool.acquire().promise
const res = await resource.query(text, values)
WIKI.models.knex.client.pool.release(resource)
return res
} catch (err) {
WIKI.logger.error('Failed to acquire DB connection during scheduler query execution.')
WIKI.logger.error(err)
}
}
},
// ...WIKI.models.knex.client.connectionSettings,
application_name: 'Wiki.js Scheduler',
schema: WIKI.config.db.schemas.scheduler,
uuid: 'v4',
archiveCompletedAfterSeconds: 120,
deleteAfterHours: 24
})
this.maxWorkers = WIKI.config.workers === 'auto' ? os.cpus().length : WIKI.config.workers
this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? os.cpus().length : WIKI.config.scheduler.workers
WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
this.pool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
errorHandler: (err) => WIKI.logger.warn(err),
......@@ -44,40 +21,49 @@ module.exports = {
},
async start () {
WIKI.logger.info('Starting Scheduler...')
await this.boss.start()
this.boss.work('wk-*', {
teamSize: this.maxWorkers,
teamConcurrency: this.maxWorkers
}, async job => {
WIKI.logger.debug(`Starting job ${job.name}:${job.id}...`)
try {
const result = await this.pool.execute({
id: job.id,
name: job.name,
data: job.data
})
WIKI.logger.debug(`Completed job ${job.name}:${job.id}.`)
job.done(null, result)
} catch (err) {
WIKI.logger.warn(`Failed job ${job.name}:${job.id}): ${err.message}`)
job.done(err)
this.runner = await run({
pgPool: new Pool({
...(typeof WIKI.models.config === 'string') ? {
connectionString: WIKI.models.config
} : WIKI.models.config,
max: this.maxWorkers + 2
}),
schema: WIKI.config.db.schemas.scheduler,
concurrency: this.maxWorkers,
noHandleSignals: true,
logger: new Logger(scope => {
return (level, message, meta) => {
const prefix = (scope?.workerId) ? `[${scope.workerId}] ` : ''
WIKI.logger[level](`${prefix}${message}`, meta)
}
}),
parsedCronItems: parseCronItems(WIKI.data.jobs.map(j => ({
...j,
identifier: uuid()
}))),
taskList: {
simple: async (payload, helpers) => {
// TODO: Handle task
},
background: async (payload, helpers) => {
try {
await this.pool.execute({
id: helpers.job.id,
name: payload.name,
data: payload.data
})
} catch (err) {
helpers.logger.warn(`Failed job: ${err.message}`)
throw err
}
}
}
this.boss.complete(job.id)
})
WIKI.logger.info('Scheduler: [ STARTED ]')
},
async stop () {
WIKI.logger.info('Stopping Scheduler...')
await this.boss.stop({ timeout: 5000 })
await this.pool.destroy()
await this.runner.stop()
WIKI.logger.info('Scheduler: [ STOPPED ]')
},
async registerScheduledJobs () {
for (const [key, job] of Object.entries(WIKI.data.jobs)) {
if (job.schedule) {
WIKI.logger.debug(`Scheduling regular job ${key}...`)
await this.boss.schedule(`wk-${key}`, job.schedule)
}
}
}
}
const _ = require('lodash')
const { createApolloFetch } = require('apollo-fetch')
/* global WIKI */
module.exports = async (localeCode) => {
WIKI.logger.info(`Fetching locale ${localeCode} from Graph endpoint...`)
try {
const apollo = createApolloFetch({
uri: WIKI.config.graphEndpoint
})
const respStrings = await apollo({
query: `query ($code: String!) {
localization {
strings(code: $code) {
key
value
}
}
}`,
variables: {
code: localeCode
}
})
const strings = _.get(respStrings, 'data.localization.strings', [])
let lcObj = {}
_.forEach(strings, row => {
if (_.includes(row.key, '::')) { return }
if (_.isEmpty(row.value)) {
row.value = row.key
}
_.set(lcObj, row.key.replace(':', '.'), row.value)
})
const locales = await WIKI.cache.get('locales')
if (locales) {
const currentLocale = _.find(locales, ['code', localeCode]) || {}
const existingLocale = await WIKI.models.locales.query().where('code', localeCode).first()
if (existingLocale) {
await WIKI.models.locales.query().patch({
strings: lcObj
}).where('code', localeCode)
} else {
await WIKI.models.locales.query().insert({
code: localeCode,
strings: lcObj,
isRTL: currentLocale.isRTL,
name: currentLocale.name,
nativeName: currentLocale.nativeName,
availability: currentLocale.availability
})
}
} else {
throw new Error('Failed to fetch cached locales list! Restart server to resolve this issue.')
}
await WIKI.lang.refreshNamespaces()
WIKI.logger.info(`Fetching locale ${localeCode} from Graph endpoint: [ COMPLETED ]`)
} catch (err) {
WIKI.logger.error(`Fetching locale ${localeCode} from Graph endpoint: [ FAILED ]`)
WIKI.logger.error(err.message)
}
}
/* global WIKI */
const Promise = require('bluebird')
const fs = require('fs-extra')
const moment = require('moment')
const path = require('path')
module.exports = async () => {
WIKI.logger.info('Purging orphaned upload files...')
try {
const uplTempPath = path.resolve(WIKI.ROOTPATH, WIKI.config.dataPath, 'uploads')
await fs.ensureDir(uplTempPath)
const ls = await fs.readdir(uplTempPath)
const fifteenAgo = moment().subtract(15, 'minutes')
await Promise.map(ls, (f) => {
return fs.stat(path.join(uplTempPath, f)).then((s) => { return { filename: f, stat: s } })
}).filter((s) => { return s.stat.isFile() }).then((arrFiles) => {
return Promise.map(arrFiles, (f) => {
if (moment(f.stat.ctime).isBefore(fifteenAgo, 'minute')) {
return fs.unlink(path.join(uplTempPath, f.filename))
}
})
})
WIKI.logger.info('Purging orphaned upload files: [ COMPLETED ]')
} catch (err) {
WIKI.logger.error('Purging orphaned upload files: [ FAILED ]')
WIKI.logger.error(err.message)
}
}
const _ = require('lodash')
/* global WIKI */
module.exports = async (pageId) => {
WIKI.logger.info(`Rebuilding page tree...`)
try {
WIKI.models = require('../core/db').init()
await WIKI.configSvc.loadFromDb()
await WIKI.configSvc.applyFlags()
const pages = await WIKI.models.pages.query().select('id', 'path', 'localeCode', 'title', 'isPrivate', 'privateNS').orderBy(['localeCode', 'path'])
let tree = []
let pik = 0
for (const page of pages) {
const pagePaths = page.path.split('/')
let currentPath = ''
let depth = 0
let parentId = null
let ancestors = []
for (const part of pagePaths) {
depth++
const isFolder = (depth < pagePaths.length)
currentPath = currentPath ? `${currentPath}/${part}` : part
const found = _.find(tree, {
localeCode: page.localeCode,
path: currentPath
})
if (!found) {
pik++
tree.push({
id: pik,
localeCode: page.localeCode,
path: currentPath,
depth: depth,
title: isFolder ? part : page.title,
isFolder: isFolder,
isPrivate: !isFolder && page.isPrivate,
privateNS: !isFolder ? page.privateNS : null,
parent: parentId,
pageId: isFolder ? null : page.id,
ancestors: JSON.stringify(ancestors)
})
parentId = pik
} else if (isFolder && !found.isFolder) {
found.isFolder = true
parentId = found.id
} else {
parentId = found.id
}
ancestors.push(parentId)
}
}
await WIKI.models.knex.table('pageTree').truncate()
if (tree.length > 0) {
// -> Save in chunks, because of per query max parameters (35k Postgres, 2k MSSQL, 1k for SQLite)
if ((WIKI.config.db.type !== 'sqlite')) {
for (const chunk of _.chunk(tree, 100)) {
await WIKI.models.knex.table('pageTree').insert(chunk)
}
} else {
for (const chunk of _.chunk(tree, 60)) {
await WIKI.models.knex.table('pageTree').insert(chunk)
}
}
}
await WIKI.models.knex.destroy()
WIKI.logger.info(`Rebuilding page tree: [ COMPLETED ]`)
} catch (err) {
WIKI.logger.error(`Rebuilding page tree: [ FAILED ]`)
WIKI.logger.error(err.message)
// exit process with error code
throw err
}
}
const _ = require('lodash')
const cheerio = require('cheerio')
/* global WIKI */
module.exports = async (pageId) => {
WIKI.logger.info(`Rendering page ID ${pageId}...`)
try {
WIKI.models = require('../core/db').init()
await WIKI.configSvc.loadFromDb()
await WIKI.configSvc.applyFlags()
const page = await WIKI.models.pages.getPageFromDb(pageId)
if (!page) {
throw new Error('Invalid Page Id')
}
await WIKI.models.renderers.fetchDefinitions()
const pipeline = await WIKI.models.renderers.getRenderingPipeline(page.contentType)
let output = page.content
if (_.isEmpty(page.content)) {
await WIKI.models.knex.destroy()
WIKI.logger.warn(`Failed to render page ID ${pageId} because content was empty: [ FAILED ]`)
}
for (let core of pipeline) {
const renderer = require(`../modules/rendering/${_.kebabCase(core.key)}/renderer.js`)
output = await renderer.render.call({
config: core.config,
children: core.children,
page: page,
input: output
})
}
// Parse TOC
const $ = cheerio.load(output)
let isStrict = $('h1').length > 0 // <- Allows for documents using H2 as top level
let toc = { root: [] }
$('h1,h2,h3,h4,h5,h6').each((idx, el) => {
const depth = _.toSafeInteger(el.name.substring(1)) - (isStrict ? 1 : 2)
let leafPathError = false
const leafPath = _.reduce(_.times(depth), (curPath, curIdx) => {
if (_.has(toc, curPath)) {
const lastLeafIdx = _.get(toc, curPath).length - 1
if (lastLeafIdx >= 0) {
curPath = `${curPath}[${lastLeafIdx}].children`
} else {
leafPathError = true
}
}
return curPath
}, 'root')
if (leafPathError) { return }
const leafSlug = $('.toc-anchor', el).first().attr('href')
$('.toc-anchor', el).remove()
_.get(toc, leafPath).push({
title: _.trim($(el).text()),
anchor: leafSlug,
children: []
})
})
// Save to DB
await WIKI.models.pages.query()
.patch({
render: output,
toc: JSON.stringify(toc.root)
})
.where('id', pageId)
// Save to cache
await WIKI.models.pages.savePageToCache({
...page,
render: output,
toc: JSON.stringify(toc.root)
})
await WIKI.models.knex.destroy()
WIKI.logger.info(`Rendering page ID ${pageId}: [ COMPLETED ]`)
} catch (err) {
WIKI.logger.error(`Rendering page ID ${pageId}: [ FAILED ]`)
WIKI.logger.error(err.message)
// exit process with error code
throw err
}
}
const fs = require('fs-extra')
const { JSDOM } = require('jsdom')
const createDOMPurify = require('dompurify')
/* global WIKI */
module.exports = async (svgPath) => {
WIKI.logger.info(`Sanitizing SVG file upload...`)
try {
let svgContents = await fs.readFile(svgPath, 'utf8')
const window = new JSDOM('').window
const DOMPurify = createDOMPurify(window)
svgContents = DOMPurify.sanitize(svgContents)
await fs.writeFile(svgPath, svgContents)
WIKI.logger.info(`Sanitized SVG file upload: [ COMPLETED ]`)
} catch (err) {
WIKI.logger.error(`Failed to sanitize SVG file upload: [ FAILED ]`)
WIKI.logger.error(err.message)
throw err
}
}
const _ = require('lodash')
const { createApolloFetch } = require('apollo-fetch')
/* global WIKI */
module.exports = async () => {
WIKI.logger.info('Syncing locales with Graph endpoint...')
try {
const apollo = createApolloFetch({
uri: WIKI.config.graphEndpoint
})
// -> Fetch locales list
const respList = await apollo({
query: `{
localization {
locales {
availability
code
name
nativeName
isRTL
createdAt
updatedAt
}
}
}`
})
const locales = _.sortBy(_.get(respList, 'data.localization.locales', []), 'name').map(lc => ({...lc, isInstalled: (lc.code === 'en')}))
WIKI.cache.set('locales', locales)
// -> Download locale strings
if (WIKI.config.lang.autoUpdate) {
const activeLocales = WIKI.config.lang.namespacing ? WIKI.config.lang.namespaces : [WIKI.config.lang.code]
for (const currentLocale of activeLocales) {
const localeInfo = _.find(locales, ['code', currentLocale])
const respStrings = await apollo({
query: `query ($code: String!) {
localization {
strings(code: $code) {
key
value
}
}
}`,
variables: {
code: currentLocale
}
})
const strings = _.get(respStrings, 'data.localization.strings', [])
let lcObj = {}
_.forEach(strings, row => {
if (_.includes(row.key, '::')) { return }
if (_.isEmpty(row.value)) {
row.value = row.key
}
_.set(lcObj, row.key.replace(':', '.'), row.value)
})
await WIKI.models.locales.query().update({
code: currentLocale,
strings: lcObj,
isRTL: localeInfo.isRTL,
name: localeInfo.name,
nativeName: localeInfo.nativeName,
availability: localeInfo.availability
}).where('code', currentLocale)
WIKI.logger.info(`Pulled latest locale updates for ${localeInfo.name} from Graph endpoint: [ COMPLETED ]`)
}
}
await WIKI.lang.refreshNamespaces()
WIKI.logger.info('Syncing locales with Graph endpoint: [ COMPLETED ]')
} catch (err) {
WIKI.logger.error('Syncing locales with Graph endpoint: [ FAILED ]')
WIKI.logger.error(err.message)
}
}
const _ = require('lodash')
const { createApolloFetch } = require('apollo-fetch')
/* global WIKI */
module.exports = async () => {
WIKI.logger.info(`Fetching latest updates from Graph endpoint...`)
try {
const apollo = createApolloFetch({
uri: WIKI.config.graphEndpoint
})
const resp = await apollo({
query: `query ($channel: ReleaseChannel!, $version: String!) {
releases {
checkForUpdates(channel: $channel, version: $version) {
channel
version
releaseDate
minimumVersionRequired
minimumNodeRequired
}
}
}`,
variables: {
channel: WIKI.config.channel,
version: WIKI.version
}
})
const info = _.get(resp, 'data.releases.checkForUpdates', false)
if (info) {
WIKI.system.updates = info
}
WIKI.logger.info(`Fetching latest updates from Graph endpoint: [ COMPLETED ]`)
} catch (err) {
WIKI.logger.error(`Fetching latest updates from Graph endpoint: [ FAILED ]`)
WIKI.logger.error(err.message)
}
}
const _ = require('lodash')
/* global WIKI */
module.exports = async (targetKey) => {
WIKI.logger.info(`Syncing with storage target ${targetKey}...`)
try {
const target = _.find(WIKI.models.storage.targets, ['key', targetKey])
if (target) {
await target.fn.sync()
WIKI.logger.info(`Syncing with storage target ${targetKey}: [ COMPLETED ]`)
await WIKI.models.storage.query().patch({
state: {
status: 'operational',
message: '',
lastAttempt: new Date().toISOString()
}
}).where('key', targetKey)
} else {
throw new Error('Invalid storage target. Unable to perform sync.')
}
} catch (err) {
WIKI.logger.error(`Syncing with storage target ${targetKey}: [ FAILED ]`)
WIKI.logger.error(err.message)
await WIKI.models.storage.query().patch({
state: {
status: 'error',
message: err.message,
lastAttempt: new Date().toISOString()
}
}).where('key', targetKey)
}
}
const path = require('node:path')
const fs = require('fs-extra')
const { DateTime } = require('luxon')
module.exports = async (payload, helpers) => {
helpers.logger.info('Purging orphaned upload files...')
try {
const uplTempPath = path.resolve(WIKI.ROOTPATH, WIKI.config.dataPath, 'uploads')
await fs.ensureDir(uplTempPath)
const ls = await fs.readdir(uplTempPath)
const fifteenAgo = DateTime.now().minus({ minutes: 15 })
for (const f of ls) {
const stat = fs.stat(path.join(uplTempPath, f))
if ((await stat).isFile && stat.ctime < fifteenAgo) {
await fs.unlink(path.join(uplTempPath, f))
}
}
helpers.logger.info('Purging orphaned upload files: [ COMPLETED ]')
} catch (err) {
helpers.logger.error('Purging orphaned upload files: [ FAILED ]')
helpers.logger.error(err.message)
}
}
const { ThreadWorker } = require('poolifier')
module.exports = new ThreadWorker(async (job) => {
// TODO: Call external task file
return { ok: true }
}, { async: true })
This diff was suppressed by a .gitattributes entry.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment