-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
64 lines (56 loc) · 2.69 KB
/
Copy pathindex.js
File metadata and controls
64 lines (56 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class TarantoolQueue {
constructor(conn, queueName, debug = false, actionCallback = undefined) {
this.conn = conn;
this.queueName = queueName;
this.debug = debug;
this.actionCallback = actionCallback; // useful to write stats
this.conn.eval('queue = require("queue")');
}
async _call(expression, ...parameters) {
try {
let paramsPrepared = parameters.map(p => {
if (typeof p == 'object' || typeof p == 'array')
return JSON.stringify(p)
.replace(/\"([^(\")"]+)\":/g,"$1:") // removing quotes from JSON properties
.replace(/:"/g, '="');
if (typeof p == 'string') return '"' + p + '"';
return p;
});
let query = 'return ' + expression + '(' + paramsPrepared.filter(p => p !== undefined).join(', ') + ')';
if (this.debug) console.log(query);
return await this.conn.eval(query);
} catch (err) {
if (/a nil value/.test(err.message))
throw 'Queue "' + this.queueName + '" does not exist';
if (/unsupported Lua type/.test(err.message))
return; // not an error @see https://github.com/tarantool/doc/commit/5d477de2103fa132a40cec9361c374a2191a5130#diff-e1a06d4e0ab9d28d02604cadea51b835
throw err.message;
}
}
_callForQueue (method, ...parameters) {
if (typeof this.actionCallback === 'function') {
this.actionCallback(method, ...parameters);
}
return this._call("queue.tube['" + this.queueName + "']:" + method, ...parameters);
}
getAll() {
return this._call("box.space['" + this.queueName + "']:select" );
}
create_tube(type = 'fifo', temporary = true) {
return this._call('queue.create_tube', this.queueName, type, { temporary });
}
drop() { return this._callForQueue('drop'); }
put(taskData, options) { return this._callForQueue('put', taskData, options); }
take(timeout) { return this._callForQueue('take', timeout); }
touch(taskId, increment) { return this._callForQueue('touch', taskId, increment); }
ack(taskId) { return this._callForQueue('ack', taskId); }
release(taskId, options) { return this._callForQueue('release', taskId, options); }
peek(taskId) { return this._callForQueue('peek', taskId); }
bury(taskId) { return this._callForQueue('bury', taskId); }
kick(count) { return this._callForQueue('kick', count); }
delete(taskId) { return this._callForQueue('delete', taskId); }
statistics() {
return this._call('queue.statistics', this.queueName);
}
}
module.exports = TarantoolQueue;