queue.js 1.43 KB
Newer Older
1
const path = require('path')
NGPixel's avatar
NGPixel committed
2 3
const Bull = require('bull')
const Promise = require('bluebird')
4
const _ = require('lodash')
NGPixel's avatar
NGPixel committed
5

6
/* global WIKI */
7

NGPixel's avatar
NGPixel committed
8
module.exports = {
9
  job: {},
NGPixel's avatar
NGPixel committed
10
  init() {
11 12 13
    _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
      this.job[queueName] = new Bull(queueName, {
        prefix: `q-${WIKI.config.ha.uid}`,
14
        redis: WIKI.config.redis
NGPixel's avatar
NGPixel committed
15
      })
16
      this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
NGPixel's avatar
NGPixel committed
17 18 19
    })
    return this
  },
20 21 22 23 24 25 26
  start() {
    _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
      if (queueParams.onInit) {
        this.job[queueName].add({}, {
          removeOnComplete: true
        })
      }
27 28 29 30 31 32
      if (queueParams.cron) {
        this.job[queueName].add({}, {
          repeat: { cron: queueParams.cron },
          removeOnComplete: true
        })
      }
33 34 35 36
    })
  },
  async clean() {
    return Promise.each(_.keys(WIKI.data.jobs), queueName => {
NGPixel's avatar
NGPixel committed
37
      return new Promise((resolve, reject) => {
38
        let keyStream = WIKI.redis.scanStream({
39
          match: `q-${WIKI.config.ha.uid}:${queueName}:*`
NGPixel's avatar
NGPixel committed
40 41 42
        })
        keyStream.on('data', resultKeys => {
          if (resultKeys.length > 0) {
43
            WIKI.redis.del(resultKeys)
NGPixel's avatar
NGPixel committed
44 45 46 47 48
          }
        })
        keyStream.on('end', resolve)
      })
    }).then(() => {
49
      WIKI.logger.info('Purging old queue jobs: [ OK ]')
NGPixel's avatar
NGPixel committed
50
    }).return(true).catch(err => {
51
      WIKI.logger.error(err)
NGPixel's avatar
NGPixel committed
52 53 54
    })
  }
}