Skip to content

Commit f8deb1b

Browse files
committed
feat: support for assigning new tasks in batches , to avoid network overhead
feat: support for assigning new tasks in batches , to avoid network overhead
1 parent c3014c4 commit f8deb1b

File tree

3 files changed

+19
-3
lines changed

3 files changed

+19
-3
lines changed

env.sample

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
token=
44
# transferEncryptToken should be 32 characters long
55
transferEncryptToken=
6-
6+
# enable/disable assigning new tasks in batches , values true/false
7+
getBatch=true
8+
# set batch size if enabled , default is 5
9+
batchSize=5
710
# LOG_LEVEL=off
811
LOG_LEVEL=info
912
# LOG_LEVEL=warn

example/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ const Worker = require('../index');
55
process.env.transferEncryptToken = '00000000000000000000000000000000';
66
process.env.token = 'demoCHannel0';
77
process.env.LOG_LEVEL = 'debug';
8+
process.env.getBatch=true;
9+
process.env.batchSize=10;
10+
811

912
async function run() {
1013
// eslint-disable-next-line no-unused-vars

src/index.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class Worker {
3737
this.taskFile = 'null';
3838
this.jobsDone = 0;
3939
this.jobsToDo = []; // push here jobs assigned due working..
40+
this.getBatch = Object.prototype.hasOwnProperty.call(process.env, 'getBatch') ? process.env.getBatch : false
41+
this.batchSize = Object.prototype.hasOwnProperty.call(process.env, 'batchSize') ? process.env.batchSize : 5
4042
this.init = this.init.bind(this);
4143
this.onSeen = this.onSeen.bind(this);
4244
this.requestWork = this.requestWork.bind(this);
@@ -112,8 +114,8 @@ class Worker {
112114
if (Helper.getTimestamp() - this.lastRequestWork > minRequestWorkWindow) {
113115
// prevent unnecessary requests , flooding Master
114116
this.lastRequestWork = Helper.getTimestamp(); // track last request for work..
115-
this.peer.rpc(this.MasterAdress, 'requestWork', {}, async (masterAns) => {
116-
if (masterAns.task) {
117+
this.peer.rpc(this.MasterAdress, 'requestWork', { getBatch: this.getBatch , batchSize: this.batchSize }, async (masterAns) => {
118+
if (masterAns.task) { // single task
117119
// null if no jobs available
118120
const job = this.crypt.decrypt(JSON.parse(masterAns.task)); // decrypt once incoming job data
119121
const startedOn = Helper.getTimestamp();
@@ -126,6 +128,14 @@ class Worker {
126128
this.log.warn(e.message);
127129
});
128130
}
131+
if (masterAns.batchTasks ){ // received batch tasks
132+
masterAns.batchTasks.forEach((encryptedTask)=>{
133+
const job = this.crypt.decrypt(JSON.parse(encryptedTask));
134+
this.jobsToDo.push(JSON.parse(job)) // push decrypted job to internal queue
135+
})
136+
// this.log.fatal(this.jobsToDo)
137+
this.event.emit('requestWork');
138+
}
129139
});
130140
}
131141
}

0 commit comments

Comments
 (0)