From 010b1e4bd1ac0eecf7a6834e78a22887384a64cb Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Sat, 30 Nov 2019 15:51:48 +0530 Subject: [PATCH 1/7] Refactors code to support inheritance --- src/server/queue/bee.js | 91 ++++++++++++++++ src/server/queue/bull.js | 101 ++++++++++++++++++ src/server/queue/index.js | 52 ++------- src/server/queue/job.js | 17 +++ src/server/queue/jobData.js | 16 +++ src/server/queue/queue.js | 43 ++++++++ src/server/views/api/bulkAction.js | 16 ++- src/server/views/api/jobRemove.js | 24 +---- src/server/views/api/jobRetry.js | 25 +---- src/server/views/api/performAction.js | 30 ++++++ src/server/views/dashboard/jobDetails.js | 13 +-- src/server/views/dashboard/queueDetails.js | 8 +- .../views/dashboard/queueJobsByState.js | 62 ++--------- .../dashboard/templates/queueJobsByState.hbs | 2 +- src/server/views/helpers/queueHelpers.js | 14 +-- .../views/partials/dashboard/jobDetails.hbs | 29 ++--- 16 files changed, 339 insertions(+), 204 deletions(-) create mode 100644 src/server/queue/bee.js create mode 100644 src/server/queue/bull.js create mode 100644 src/server/queue/job.js create mode 100644 src/server/queue/jobData.js create mode 100644 src/server/queue/queue.js create mode 100644 src/server/views/api/performAction.js diff --git a/src/server/queue/bee.js b/src/server/queue/bee.js new file mode 100644 index 00000000..ebd23c29 --- /dev/null +++ b/src/server/queue/bee.js @@ -0,0 +1,91 @@ +const Queue = require('./queue'); +const Job = require('./job'); +const JobData = require('./jobData'); + +class BeeJob extends Job { + constructor(job) { + super(job); + } + + async remove() { + await this._job.remove(); + } + + async getStatus() { + return Promise.resolve(this._job.status); + } + + async toJSON() { + const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job; + return new JobData({id, progress, data, timestamp, stacktrace, delay}); + } +} + +const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed']; +const SUPPORTED_ACTIONS = ['remove']; + +module.exports = class BeeQueue extends Queue { + constructor(queueConfig) { + const {name} = queueConfig; + const options = BeeQueue.parseConfig(queueConfig); + const queue = new BeeQueue(name, options); + super(queue); + } + + static parseConfig(queueConfig) { + const options = { + redis: this.parseRedisConfig(queueConfig), + isWorker: false, + getEvents: false, + sendEvents: false, + storeJobs: false, + }; + const {prefix} = queueConfig; + if (prefix) options.prefix = prefix; + return options; + } + + async getJob(id) { + const job = this._queue.getJob(id); + return new BeeJob(job); + } + + async getJobCounts() { + const jobCounts = this._queue.checkHealth(); + delete jobCounts.newestJob; + return jobCounts; + } + + async getJobs(state, start, size) { + const page = {}; + + if (['failed', 'succeeded'].includes(state)) { + page.size = size; + } else { + page.start = start; + page.end = start + size - 1; + } + + let jobs = await this._queue.getJobs(state, page); + // Filter out Bee jobs that have already been removed by the time the promise resolves + jobs = jobs.filter((job) => job); + return jobs.map((j) => new BeeJob(j)); + } + + async addJob(data) { + const job = await this._queue.createJob(data).save(); + return new BeeJob(job); + } + + isValidState(state) { + return VALID_STATES.includes(state); + } + + isActionSupported(action) { + return SUPPORTED_ACTIONS.includes(action); + } + + isPaginationSupported(state) { + return state !== 'succeeded' && state !== 'failed'; + } +}; diff --git a/src/server/queue/bull.js b/src/server/queue/bull.js new file mode 100644 index 00000000..0d17ddcf --- /dev/null +++ b/src/server/queue/bull.js @@ -0,0 +1,101 @@ +const {capitalize} = require('lodash'); +const Bull = require('bull'); +const Queue = require('./queue'); +const Job = require('./job'); +const JobData = require('./jobData'); + +const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed']; +const SUPPORTED_ACTIONS = ['remove', 'retry']; + +class BullJob extends Job { + constructor(job) { + super(job); + } + + async remove() { + await this._job.remove(); + } + + async retry() { + await this._job.retry(); + } + + async getStatus() { + return this._job.getState(); + } + + + async toJSON() { + const { + id, + name, + data, + attemptsMade, + failedReason, + stacktrace, + returnvalue: returnValue, + timestamp, + delay, + progress + } = this._job.toJSON(); + return new JobData({ + id, + name, + data, + attemptsMade, + failedReason, + stacktrace, + timestamp, + delay, + progress, + returnValue, + }); + } +} + +module.exports = class BullQueue extends Queue { + constructor(queueConfig) { + const {name} = queueConfig; + const options = BullQueue.parseConfig(queueConfig); + const queue = Bull(name, options); + super(queue); + } + + static parseConfig(queueConfig) { + const options = {redis: this.parseRedisConfig(queueConfig)}; + const {createClient, prefix} = queueConfig; + if (createClient) options.createClient = createClient; + if (prefix) options.prefix = prefix; + return options; + } + + async getJob(id) { + const job = await this._queue.getJob(id); + return new BullJob(job); + } + + async getJobCounts() { + return this._queue.getJobCounts(); + } + + async getJobs(state, start, size) { + const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1); + return jobs.map((j) => new BullJob(j)); + } + + async addJob(data) { + const job = await this._queue.add(data, { + removeOnComplete: false, + removeOnFail: false + }); + return new BullJob(job); + } + + isValidState(state) { + return VALID_STATES.includes(state); + } + + isActionSupported(action) { + return SUPPORTED_ACTIONS.includes(action); + } +}; diff --git a/src/server/queue/index.js b/src/server/queue/index.js index e9bd20fe..7e3a2405 100644 --- a/src/server/queue/index.js +++ b/src/server/queue/index.js @@ -1,6 +1,6 @@ const _ = require('lodash'); -const Bull = require('bull'); -const Bee = require('bee-queue'); +const BullQueue = require('./bull'); +const BeeQueue = require('./bee'); class Queues { constructor(config) { @@ -38,35 +38,12 @@ class Queues { return this._queues[queueHost][queueName]; } - const { type, name, port, host, db, password, prefix, url, redis, tls } = queueConfig; - - const redisHost = { host }; - if (password) redisHost.password = password; - if (port) redisHost.port = port; - if (db) redisHost.db = db; - if (tls) redisHost.tls = tls; - - const isBee = type === 'bee'; - - const options = { - redis: redis || url || redisHost - }; - if (prefix) options.prefix = prefix; - + const {type} = queueConfig; let queue; - if (isBee) { - _.extend(options, { - isWorker: false, - getEvents: false, - sendEvents: false, - storeJobs: false - }); - - queue = new Bee(name, options); - queue.IS_BEE = true; + if (type === 'bee') { + queue = new BeeQueue(queueConfig); } else { - if (queueConfig.createClient) options.createClient = queueConfig.createClient; - queue = new Bull(name, options); + queue = new BullQueue(queueConfig); } this._queues[queueHost] = this._queues[queueHost] || {}; @@ -74,23 +51,6 @@ class Queues { return queue; } - - /** - * Creates and adds a job with the given `data` to the given `queue`. - * - * @param {Object} queue A bee or bull queue class - * @param {Object} data The data to be used within the job - */ - async set(queue, data) { - if (queue.IS_BEE) { - return queue.createJob(data).save(); - } else { - return queue.add(data, { - removeOnComplete: false, - removeOnFail: false - }); - } - } } module.exports = Queues; diff --git a/src/server/queue/job.js b/src/server/queue/job.js new file mode 100644 index 00000000..2a312f0a --- /dev/null +++ b/src/server/queue/job.js @@ -0,0 +1,17 @@ +module.exports = class Job { + constructor(job) { + this._job = job; + if (new.target === Job) { + throw new TypeError("Cannot construct Job instances directly"); + } + } + + async remove() { + } + + async getStatus() { + } + + async toJSON() { + } +}; diff --git a/src/server/queue/jobData.js b/src/server/queue/jobData.js new file mode 100644 index 00000000..7561819f --- /dev/null +++ b/src/server/queue/jobData.js @@ -0,0 +1,16 @@ +module.exports = class JobData { + constructor({id, name, data, stacktrace, timestamp, progress, delay, attemptsMade, returnValue, failedReason}) { + this.id = id; + this.name = name; + this.data = data; + this.progress = progress; + this.attemptsMade = attemptsMade; + this.returnValue = returnValue; + this.failedReason = failedReason; + this.options = { + stacktrace, + timestamp, + delay, + }; + } +}; diff --git a/src/server/queue/queue.js b/src/server/queue/queue.js new file mode 100644 index 00000000..de850975 --- /dev/null +++ b/src/server/queue/queue.js @@ -0,0 +1,43 @@ +module.exports = class Queue { + constructor(queue) { + this._queue = queue; + if (new.target === Queue) { + throw new TypeError("Cannot construct Queue instances directly"); + } + } + + static parseRedisConfig({port, host, db, password, url, redis, tls}) { + const redisHost = {host}; + if (password) redisHost.password = password; + if (port) redisHost.port = port; + if (db) redisHost.db = db; + if (tls) redisHost.tls = tls; + return redis || url || redisHost; + } + + get redisClient() { + return this._queue.client; + } + + async getJob(id) { + } + + async getJobCounts() { + } + + async getJobs(state, start, size) { + } + + async addJob(data, options) { + } + + isValidState(state) { + } + + isActionSupported(action) { + } + + isPaginationSupported(_state) { + return true; + } +}; diff --git a/src/server/views/api/bulkAction.js b/src/server/views/api/bulkAction.js index 83f364e6..9f1d3516 100644 --- a/src/server/views/api/bulkAction.js +++ b/src/server/views/api/bulkAction.js @@ -1,21 +1,19 @@ const _ = require('lodash'); -const ACTIONS = ['remove', 'retry']; - function bulkAction(action) { return async function handler(req, res) { - if (!_.includes(ACTIONS, action)) { - res.status(401).send({ - error: 'unauthorized action', - details: `action ${action} not permitted` - }); - } - const { queueName, queueHost } = req.params; const {Queues} = req.app.locals; const queue = await Queues.get(queueName, queueHost); if (!queue) return res.status(404).send({error: 'queue not found'}); + if (!queue.isActionSupported(action)) { + return res.status(401).send({ + error: 'unauthorized action', + details: `queue does not support action ${action}` + }); + } + const {jobs} = req.body; try { diff --git a/src/server/views/api/jobRemove.js b/src/server/views/api/jobRemove.js index 8a5abfdd..4ddca41d 100644 --- a/src/server/views/api/jobRemove.js +++ b/src/server/views/api/jobRemove.js @@ -1,23 +1 @@ -async function handler(req, res) { - const { queueName, queueHost, id } = req.params; - - const {Queues} = req.app.locals; - const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); - - const job = await queue.getJob(id); - if (!job) return res.status(404).send({error: 'job not found'}); - - try { - await job.remove(); - return res.sendStatus(200); - } catch (e) { - const body = { - error: 'queue error', - details: e.stack - }; - return res.status(500).send(body); - } -} - -module.exports = handler; +module.exports = require('./performAction')('remove'); diff --git a/src/server/views/api/jobRetry.js b/src/server/views/api/jobRetry.js index c8c10c02..8c8bcb58 100644 --- a/src/server/views/api/jobRetry.js +++ b/src/server/views/api/jobRetry.js @@ -1,24 +1 @@ -async function handler(req, res) { - const { queueName, queueHost, id } = req.params; - - const {Queues} = req.app.locals; - - const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); - - const job = await queue.getJob(id); - if (!job) return res.status(404).send({error: 'job not found'}); - - try { - await job.retry(); - return res.sendStatus(200); - } catch (e) { - const body = { - error: 'queue error', - details: e.stack - }; - return res.status(500).send(body); - } -} - -module.exports = handler; +module.exports = require('./performAction')('retry'); diff --git a/src/server/views/api/performAction.js b/src/server/views/api/performAction.js new file mode 100644 index 00000000..a6f1cea5 --- /dev/null +++ b/src/server/views/api/performAction.js @@ -0,0 +1,30 @@ +module.exports = function performAction(action) { + return async function handler(req, res) { + const {queueName, queueHost, id} = req.params; + + const {Queues} = req.app.locals; + const queue = await Queues.get(queueName, queueHost); + if (!queue) return res.status(404).send({error: 'queue not found'}); + + if (!queue.isActionSupported(action)) { + return res.status(401).send({ + error: 'unauthorized action', + details: `queue does not support action ${action}` + }); + } + + const job = await queue.getJob(id); + if (!job) return res.status(404).send({error: 'job not found'}); + + try { + await job[action](); + return res.sendStatus(200); + } catch (e) { + const body = { + error: 'queue error', + details: e.stack + }; + return res.status(500).send(body); + } + }; +}; diff --git a/src/server/views/dashboard/jobDetails.js b/src/server/views/dashboard/jobDetails.js index 5fe9f0b5..dab87d10 100644 --- a/src/server/views/dashboard/jobDetails.js +++ b/src/server/views/dashboard/jobDetails.js @@ -1,5 +1,4 @@ const _ = require('lodash'); -const util = require('util'); async function handler(req, res) { const { queueName, queueHost, id } = req.params; @@ -12,26 +11,22 @@ async function handler(req, res) { const job = await queue.getJob(id); if (!job) return res.status(404).render('dashboard/templates/jobNotFound', {basePath, id, queueName, queueHost}); + const jobData = await job.toJSON(); if (json === 'true') { // Omit these private and non-stringifyable properties to avoid circular // references parsing errors. - return res.json(_.omit(job, 'domain', 'queue', '_events', '_eventsCount')); + return res.json(jobData); } - let jobState; - if (queue.IS_BEE) { - jobState = job.status; - } else { - jobState = await job.getState(); - } + const jobState = await job.getStatus(); return res.render('dashboard/templates/jobDetails', { basePath, queueName, queueHost, jobState, - job + job: jobData }); } diff --git a/src/server/views/dashboard/queueDetails.js b/src/server/views/dashboard/queueDetails.js index 7ea101fe..f0bf1ef7 100644 --- a/src/server/views/dashboard/queueDetails.js +++ b/src/server/views/dashboard/queueDetails.js @@ -7,13 +7,7 @@ async function handler(req, res) { const basePath = req.baseUrl; if (!queue) return res.status(404).render('dashboard/templates/queueNotFound', {basePath, queueName, queueHost}); - let jobCounts; - if (queue.IS_BEE) { - jobCounts = await queue.checkHealth(); - delete jobCounts.newestJob; - } else { - jobCounts = await queue.getJobCounts(); - } + const jobCounts = await queue.getJobCounts(); const stats = await QueueHelpers.getStats(queue); return res.render('dashboard/templates/queueDetails', { diff --git a/src/server/views/dashboard/queueJobsByState.js b/src/server/views/dashboard/queueJobsByState.js index 8a76f186..b07b8107 100644 --- a/src/server/views/dashboard/queueJobsByState.js +++ b/src/server/views/dashboard/queueJobsByState.js @@ -1,18 +1,4 @@ const _ = require('lodash'); -const { BEE_STATES, BULL_STATES } = require('../helpers/queueHelpers'); - -/** - * Determines if the requested job state lookup is valid. - * - * @param {String} state - * @param {Boolean} isBee States vary between bull and bee - * - * @return {Boolean} - */ -function isValidState(state, isBee) { - const validStates = isBee ? BEE_STATES : BULL_STATES; - return _.includes(validStates, state); -} async function handler(req, res) { if (req.params.ext === 'json') return _json(req, res); @@ -32,16 +18,10 @@ async function _json(req, res) { const queue = await Queues.get(queueName, queueHost); if (!queue) return res.status(404).json({ message: 'Queue not found' }); - if (!isValidState(state, queue.IS_BEE)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); + if (!queue.isValidState(state)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); - let jobs; - if (queue.IS_BEE) { - jobs = await queue.getJobs(state, { size: 1000 }); - jobs = jobs.map((j) => _.pick(j, 'id', 'progress', 'data', 'options', 'status')); - } else { - jobs = await queue[`get${_.capitalize(state)}`](0, 1000); - jobs = jobs.map((j) => j.toJSON()); - } + let jobs = await queue.getJobs(state, 0, 1000); + jobs = jobs.map((j) => j.toJSON()); const filename = `${queueName}-${state}-dump.json`; @@ -63,40 +43,16 @@ async function _html(req, res) { const basePath = req.baseUrl; if (!queue) return res.status(404).render('dashboard/templates/queueNotFound', {basePath, queueName, queueHost}); - if (!isValidState(state, queue.IS_BEE)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); - - let jobCounts; - if (queue.IS_BEE) { - jobCounts = await queue.checkHealth(); - delete jobCounts.newestJob; - } else { - jobCounts = await queue.getJobCounts(); - } + if (!queue.isValidState(state)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); + const jobCounts = await queue.getJobCounts(); const page = parseInt(req.query.page, 10) || 1; const pageSize = parseInt(req.query.pageSize, 10) || 100; const startId = (page - 1) * pageSize; - const endId = startId + pageSize - 1; - - let jobs; - if (queue.IS_BEE) { - const page = {}; - - if (['failed', 'succeeded'].includes(state)) { - page.size = pageSize; - } else { - page.start = startId; - page.end = endId; - } - - jobs = await queue.getJobs(state, page); - - // Filter out Bee jobs that have already been removed by the time the promise resolves - jobs = jobs.filter((job) => job); - } else { - jobs = await queue[`get${_.capitalize(state)}`](startId, endId); - } + let jobs = await queue.getJobs(state, startId, pageSize); + const jobPromises = jobs.map((j) => j.toJSON()); + jobs = await Promise.all(jobPromises); let pages = _.range(page - 6, page + 7) .filter((page) => page >= 1); @@ -112,7 +68,7 @@ async function _html(req, res) { state, jobs, jobsInStateCount: jobCounts[state], - disablePagination: queue.IS_BEE && (state === 'succeeded' || state === 'failed'), + disablePagination: !queue.isPaginationSupported(state), currentPage: page, pages, pageSize, diff --git a/src/server/views/dashboard/templates/queueJobsByState.hbs b/src/server/views/dashboard/templates/queueJobsByState.hbs index d647abf9..6577d51e 100644 --- a/src/server/views/dashboard/templates/queueJobsByState.hbs +++ b/src/server/views/dashboard/templates/queueJobsByState.hbs @@ -39,7 +39,7 @@ {{else}} - Bee-queue does not support pagination for {{ state }} queues — currently displaying up to {{ pageSize }} jobs. To change count, use "Size" dropdown. + Queue implementation does not support pagination for {{ state }} queues — currently displaying up to {{ pageSize }} jobs. To change count, use "Size" dropdown. {{/unless}}
diff --git a/src/server/views/helpers/queueHelpers.js b/src/server/views/helpers/queueHelpers.js index 75a36a85..6812ff24 100644 --- a/src/server/views/helpers/queueHelpers.js +++ b/src/server/views/helpers/queueHelpers.js @@ -30,9 +30,9 @@ function formatBytes(num) { const Helpers = { getStats: async function(queue) { - await queue.client.info(); // update queue.client.serverInfo + await queue.redisClient.info(); // update queue.client.serverInfo - const stats = _.pickBy(queue.client.serverInfo, (value, key) => _.includes(this._usefulMetrics, key)); + const stats = _.pickBy(queue.redisClient.serverInfo, (value, key) => _.includes(this._usefulMetrics, key)); stats.used_memory = formatBytes(parseInt(stats.used_memory, 10)); stats.total_system_memory = formatBytes(parseInt(stats.total_system_memory, 10)); return stats; @@ -46,16 +46,6 @@ const Helpers = { 'connected_clients', 'blocked_clients' ], - - /** - * Valid states for a job in bee queue - */ - BEE_STATES: ['waiting', 'active', 'succeeded', 'failed', 'delayed'], - - /** - * Valid states for a job in bull queue - */ - BULL_STATES: ['waiting', 'active', 'completed', 'failed', 'delayed'] }; module.exports = Helpers; diff --git a/src/server/views/partials/dashboard/jobDetails.hbs b/src/server/views/partials/dashboard/jobDetails.hbs index c8db1457..53662149 100644 --- a/src/server/views/partials/dashboard/jobDetails.hbs +++ b/src/server/views/partials/dashboard/jobDetails.hbs @@ -24,17 +24,14 @@ {{#if this.options.timestamp}} {{moment this.options.timestamp "llll"}} {{/if}} - {{#if this.timestamp}} - {{moment this.timestamp "llll"}} - {{/if}}
Attempts Made
{{this.attemptsMade}} - {{#if this.options}} - {{length this.options.stacktraces}} + {{#if this.options.stacktrace}} + {{length this.options.stacktrace}} {{/if}}
@@ -46,7 +43,6 @@ -{{#unless this.queue.IS_BEE}}
Progress
- {{ this._progress }}% + style="width: {{ this.progress }}%; min-width: 2em;"> + {{ this.progress }}%
-{{/unless}} -{{#if this.returnvalue}} +{{#if this.returnValue}}
Return Value
-
{{json this.returnvalue true}}
+
{{json this.returnValue true}}
{{/if}} {{#if this.failedReason}} @@ -75,14 +70,8 @@ {{#eq jobState 'failed'}}
Stacktraces
- {{#if this.options.stacktraces}} - {{#each this.options.stacktraces}} -
{{ this }}
- {{/each}} - {{/if}} - - {{#if this.stacktrace}} - {{#each this.stacktrace}} + {{#if this.options.stacktrace}} + {{#each this.options.stacktrace}}
{{ this }}
{{/each}} {{/if}} From 68021e8678dfc084f9ed64cad7e24956c8d52d4b Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Sat, 30 Nov 2019 17:59:44 +0530 Subject: [PATCH 2/7] Removes unused lodash require --- src/server/views/dashboard/jobDetails.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/server/views/dashboard/jobDetails.js b/src/server/views/dashboard/jobDetails.js index dab87d10..6561489b 100644 --- a/src/server/views/dashboard/jobDetails.js +++ b/src/server/views/dashboard/jobDetails.js @@ -1,5 +1,3 @@ -const _ = require('lodash'); - async function handler(req, res) { const { queueName, queueHost, id } = req.params; const { json } = req.query; From 34b99228b754de4a23854abb02a03336c0644d65 Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Thu, 5 Dec 2019 10:03:35 +0530 Subject: [PATCH 3/7] Apply suggestions from code review for removing redundancy Co-Authored-By: Eli Skeggs --- src/server/queue/bee.js | 2 +- src/server/views/helpers/queueHelpers.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/queue/bee.js b/src/server/queue/bee.js index ebd23c29..b1d0976b 100644 --- a/src/server/queue/bee.js +++ b/src/server/queue/bee.js @@ -12,7 +12,7 @@ class BeeJob extends Job { } async getStatus() { - return Promise.resolve(this._job.status); + return this._job.status; } async toJSON() { diff --git a/src/server/views/helpers/queueHelpers.js b/src/server/views/helpers/queueHelpers.js index 6812ff24..770edcf6 100644 --- a/src/server/views/helpers/queueHelpers.js +++ b/src/server/views/helpers/queueHelpers.js @@ -32,7 +32,7 @@ const Helpers = { getStats: async function(queue) { await queue.redisClient.info(); // update queue.client.serverInfo - const stats = _.pickBy(queue.redisClient.serverInfo, (value, key) => _.includes(this._usefulMetrics, key)); + const stats = _.pickBy(queue.redisClient.serverInfo, (value, key) => this._usefulMetrics.includes(key)); stats.used_memory = formatBytes(parseInt(stats.used_memory, 10)); stats.total_system_memory = formatBytes(parseInt(stats.total_system_memory, 10)); return stats; From 0a2f7d1fe85bae0d714ae5b9661be7bab2ec3866 Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Thu, 5 Dec 2019 10:08:13 +0530 Subject: [PATCH 4/7] Fixes the flow for adding job --- src/server/views/api/jobAdd.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/views/api/jobAdd.js b/src/server/views/api/jobAdd.js index cbca9d38..7c16fe23 100644 --- a/src/server/views/api/jobAdd.js +++ b/src/server/views/api/jobAdd.js @@ -8,7 +8,7 @@ async function handler(req, res) { if (!queue) return res.status(404).json({ error: 'queue not found' }); try { - await Queues.set(queue, data); + await queue.addJob(data); } catch (err) { return res.status(500).json({ error: err.message }); } From 584b3e0089422c85d9d2c6d1bd58e46fe951f152 Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Thu, 5 Dec 2019 10:10:24 +0530 Subject: [PATCH 5/7] Removes implicit constructors --- src/server/queue/bee.js | 4 ---- src/server/queue/bull.js | 4 ---- 2 files changed, 8 deletions(-) diff --git a/src/server/queue/bee.js b/src/server/queue/bee.js index b1d0976b..df14c40b 100644 --- a/src/server/queue/bee.js +++ b/src/server/queue/bee.js @@ -3,10 +3,6 @@ const Job = require('./job'); const JobData = require('./jobData'); class BeeJob extends Job { - constructor(job) { - super(job); - } - async remove() { await this._job.remove(); } diff --git a/src/server/queue/bull.js b/src/server/queue/bull.js index 0d17ddcf..9ec02d87 100644 --- a/src/server/queue/bull.js +++ b/src/server/queue/bull.js @@ -8,10 +8,6 @@ const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed']; const SUPPORTED_ACTIONS = ['remove', 'retry']; class BullJob extends Job { - constructor(job) { - super(job); - } - async remove() { await this._job.remove(); } From 69e0aafd13c0877a2c6d7742a2f5fc95198c6556 Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Thu, 5 Dec 2019 10:25:01 +0530 Subject: [PATCH 6/7] Adds NotImplementedError and uses it in every abstract method --- src/server/error/NotImplementedError.js | 7 +++++++ src/server/queue/job.js | 5 +++++ src/server/queue/queue.js | 18 +++++++++++++----- 3 files changed, 25 insertions(+), 5 deletions(-) create mode 100644 src/server/error/NotImplementedError.js diff --git a/src/server/error/NotImplementedError.js b/src/server/error/NotImplementedError.js new file mode 100644 index 00000000..6c57583d --- /dev/null +++ b/src/server/error/NotImplementedError.js @@ -0,0 +1,7 @@ +module.exports = class NotImplementedError extends Error { + constructor(message = "") { + super(); + this.message = message; + this.name = this.constructor.name; + } +}; diff --git a/src/server/queue/job.js b/src/server/queue/job.js index 2a312f0a..030cd4c6 100644 --- a/src/server/queue/job.js +++ b/src/server/queue/job.js @@ -1,3 +1,5 @@ +const NotImplementedError = require('../error/NotImplementedError'); + module.exports = class Job { constructor(job) { this._job = job; @@ -7,11 +9,14 @@ module.exports = class Job { } async remove() { + throw new NotImplementedError(); } async getStatus() { + throw new NotImplementedError(); } async toJSON() { + throw new NotImplementedError(); } }; diff --git a/src/server/queue/queue.js b/src/server/queue/queue.js index de850975..8af9e643 100644 --- a/src/server/queue/queue.js +++ b/src/server/queue/queue.js @@ -1,3 +1,5 @@ +const NotImplementedError = require('../error/NotImplementedError'); + module.exports = class Queue { constructor(queue) { this._queue = queue; @@ -19,22 +21,28 @@ module.exports = class Queue { return this._queue.client; } - async getJob(id) { + async getJob(_id) { + throw new NotImplementedError(); } async getJobCounts() { + throw new NotImplementedError(); } - async getJobs(state, start, size) { + async getJobs(_state, _start, _size) { + throw new NotImplementedError(); } - async addJob(data, options) { + async addJob(_data, _options) { + throw new NotImplementedError(); } - isValidState(state) { + isValidState(_state) { + throw new NotImplementedError(); } - isActionSupported(action) { + isActionSupported(_action) { + throw new NotImplementedError(); } isPaginationSupported(_state) { From dd8532d723228b32c98d384279b4c7b0527e77a0 Mon Sep 17 00:00:00 2001 From: Anurag Agarwal Date: Thu, 5 Dec 2019 10:28:14 +0530 Subject: [PATCH 7/7] Apply suggestions from code review to change the api return values Co-Authored-By: Eli Skeggs --- src/server/views/api/bulkAction.js | 4 ++-- src/server/views/api/performAction.js | 10 +++++----- src/server/views/dashboard/jobDetails.js | 2 +- src/server/views/dashboard/queueJobsByState.js | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/server/views/api/bulkAction.js b/src/server/views/api/bulkAction.js index 9f1d3516..5bc6c504 100644 --- a/src/server/views/api/bulkAction.js +++ b/src/server/views/api/bulkAction.js @@ -5,10 +5,10 @@ function bulkAction(action) { const { queueName, queueHost } = req.params; const {Queues} = req.app.locals; const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); + if (!queue) return void res.status(404).json({error: 'queue not found'}); if (!queue.isActionSupported(action)) { - return res.status(401).send({ + return void res.status(401).json({ error: 'unauthorized action', details: `queue does not support action ${action}` }); diff --git a/src/server/views/api/performAction.js b/src/server/views/api/performAction.js index a6f1cea5..b81d24ef 100644 --- a/src/server/views/api/performAction.js +++ b/src/server/views/api/performAction.js @@ -4,27 +4,27 @@ module.exports = function performAction(action) { const {Queues} = req.app.locals; const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); + if (!queue) return void res.status(404).json({error: 'queue not found'}); if (!queue.isActionSupported(action)) { - return res.status(401).send({ + return void res.status(401).json({ error: 'unauthorized action', details: `queue does not support action ${action}` }); } const job = await queue.getJob(id); - if (!job) return res.status(404).send({error: 'job not found'}); + if (!job) return void res.status(404).json({error: 'job not found'}); try { await job[action](); - return res.sendStatus(200); + return void res.sendStatus(204); } catch (e) { const body = { error: 'queue error', details: e.stack }; - return res.status(500).send(body); + return void res.status(500).send(body); } }; }; diff --git a/src/server/views/dashboard/jobDetails.js b/src/server/views/dashboard/jobDetails.js index 6561489b..a3be68d7 100644 --- a/src/server/views/dashboard/jobDetails.js +++ b/src/server/views/dashboard/jobDetails.js @@ -14,7 +14,7 @@ async function handler(req, res) { if (json === 'true') { // Omit these private and non-stringifyable properties to avoid circular // references parsing errors. - return res.json(jobData); + return void res.json(jobData); } const jobState = await job.getStatus(); diff --git a/src/server/views/dashboard/queueJobsByState.js b/src/server/views/dashboard/queueJobsByState.js index b07b8107..b504db34 100644 --- a/src/server/views/dashboard/queueJobsByState.js +++ b/src/server/views/dashboard/queueJobsByState.js @@ -43,7 +43,7 @@ async function _html(req, res) { const basePath = req.baseUrl; if (!queue) return res.status(404).render('dashboard/templates/queueNotFound', {basePath, queueName, queueHost}); - if (!queue.isValidState(state)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); + if (!queue.isValidState(state)) return void res.status(400).json({ message: `Invalid state requested: ${state}` }); const jobCounts = await queue.getJobCounts(); const page = parseInt(req.query.page, 10) || 1;