Skip to content

Commit e0b5816

Browse files
committed
Fixed blob stream deadlock
Fixed the issue in SAP#233 where streaming blobs out of the database into another table can cause a deadlock - Added a "blocked" mode to the Queue which prevents tasks from running except for the blocking task and READ_LOB tasks - Modified ExecuteTask's run to free the queue while it waits for the Writer's getParameters - The callback of getParameters will enqueue the task again to send the packet - Before the freeing of the queue to the next task, ExecuteTask will block the queue to only allow itself and READ_LOB tasks to run - This prevents issues where exec's can run at the same time which will lead to HANA disconnecting and sending invalid LOB locator id errors
1 parent 426a1bf commit e0b5816

File tree

5 files changed

+468
-87
lines changed

5 files changed

+468
-87
lines changed

lib/protocol/Connection.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ Connection.prototype.enqueue = function enqueue(task, cb) {
441441
if (task instanceof request.Segment) {
442442
queueable = this._queue.createTask(this.send.bind(this, task), cb);
443443
queueable.name = MessageTypeName[task.type];
444+
queueable.msgType = task.type;
444445
} else if (util.isFunction(task.run)) {
445446
queueable = task;
446447
}
@@ -732,6 +733,10 @@ Connection.prototype.isIdle = function isIdle() {
732733
return this._queue.empty && !this._queue.busy;
733734
};
734735

736+
Connection.prototype.blockQueue = function blockQueue(blockingTask) {
737+
this._queue.block(blockingTask);
738+
}
739+
735740
Connection.prototype.setAutoCommit = function setAutoCommit(autoCommit) {
736741
this._transaction.autoCommit = autoCommit;
737742
};

lib/protocol/ExecuteTask.js

Lines changed: 83 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ function ExecuteTask(connection, options, callback) {
3939
}
4040
this.callback = callback;
4141
this.reply = undefined;
42+
this.finishedError = null;
43+
this.finishedParameters = undefined;
44+
this.isExecuteParams = true;
4245
}
4346

4447
ExecuteTask.create = function createExecuteTask(connection, options, cb) {
@@ -59,60 +62,79 @@ ExecuteTask.prototype.run = function run(next) {
5962
}
6063
if (err) {
6164
return self.sendRollback(function () {
62-
// ignore roolback error
65+
// ignore rollback error
6366
done(err);
6467
});
6568
}
6669
self.sendCommit(done);
6770
}
6871

69-
function execute() {
72+
function getExecuteRequest() {
7073
if (!self.parameterValues.length && !self.writer.hasParameters) {
7174
return finalize();
7275
}
73-
self.sendExecute(function receive(err, reply) {
74-
if (err) {
75-
return finalize(err);
76-
}
77-
if (!self.writer.finished && reply.rowsAffected == -1) {
78-
reply.rowsAffected = undefined;
79-
}
80-
self.pushReply(reply);
81-
if (!self.writer.finished && reply.writeLobReply) {
82-
self.writer.update(reply.writeLobReply);
83-
}
84-
writeLob();
76+
self.finishedParameters = undefined;
77+
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
78+
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
79+
80+
// Block the queue to this task and read lob requests
81+
self.connection.blockQueue(self);
82+
83+
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
84+
// Enqueue itself to wait for when the task becomes the one actively running in the queue
85+
// and the connection is avaliable to send the packet
86+
self.finishedError = err;
87+
self.finishedParameters = parameters;
88+
self.isExecuteParams = true;
89+
self.connection.enqueue(self);
8590
});
91+
92+
// Yield to only read lob tasks in the queue, the callback will enqueue this task
93+
// again once the parameters are ready
94+
next();
8695
}
8796

88-
function writeLob() {
97+
function getWriteLobRequest() {
8998
if (self.writer.finished || self.writer.hasParameters) {
90-
return execute();
99+
return getExecuteRequest();
91100
}
92-
self.sendWriteLobRequest(function receive(err, reply) {
93-
/* jshint unused:false */
94-
if (err) {
95-
return finalize(err);
96-
}
97-
self.pushReply(reply);
98-
writeLob();
101+
self.finishedParameters = undefined;
102+
var availableSize = self.connection.getAvailableSize(true);
103+
self.connection.blockQueue(self);
104+
self.writer.getWriteLobRequest(availableSize, function (err, buffer) {
105+
self.finishedError = err;
106+
self.finishedParameters = buffer;
107+
self.isExecuteParams = false;
108+
self.connection.enqueue(self);
99109
});
110+
111+
next();
100112
}
101113

102-
// validate function code
103-
if (self.parameterValues.length > 1) {
104-
switch (self.functionCode) {
105-
case FunctionCode.DDL:
106-
case FunctionCode.INSERT:
107-
case FunctionCode.UPDATE:
108-
case FunctionCode.DELETE:
109-
break;
110-
default:
111-
return done(createInvalidFunctionCodeError());
114+
if (this.finishedError) {
115+
finalize(this.finishedError);
116+
} else if (this.finishedParameters) {
117+
if (this.isExecuteParams) {
118+
self.sendExecute(this.finishedParameters, finalize, getWriteLobRequest);
119+
} else {
120+
self.sendWriteLobRequest(this.finishedParameters, finalize, getWriteLobRequest);
121+
}
122+
} else { // No stored error or parameters, so get initial execute data
123+
// validate function code
124+
if (self.parameterValues.length > 1) {
125+
switch (self.functionCode) {
126+
case FunctionCode.DDL:
127+
case FunctionCode.INSERT:
128+
case FunctionCode.UPDATE:
129+
case FunctionCode.DELETE:
130+
break;
131+
default:
132+
return done(createInvalidFunctionCodeError());
133+
}
112134
}
113-
}
114135

115-
execute();
136+
getExecuteRequest();
137+
}
116138
};
117139

118140
ExecuteTask.prototype.end = function end(err) {
@@ -195,35 +217,40 @@ ExecuteTask.prototype.getParameters = function getParameters(availableSize, avai
195217
next();
196218
};
197219

198-
ExecuteTask.prototype.sendExecute = function sendExecute(cb) {
220+
ExecuteTask.prototype.sendExecute = function sendExecute(parameters, finalize, cb) {
199221
var self = this;
200-
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
201-
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
202-
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
222+
self.connection.send(request.execute({
223+
autoCommit: self.autoCommit,
224+
holdCursorsOverCommit: self.holdCursorsOverCommit,
225+
scrollableCursor: self.scrollableCursor,
226+
statementId: self.statementId,
227+
parameters: parameters,
228+
useCesu8: self.connection.useCesu8
229+
}), function (err, reply) {
203230
if (err) {
204-
return cb(err);
231+
return finalize(err);
205232
}
206-
self.connection.send(request.execute({
207-
autoCommit: self.autoCommit,
208-
holdCursorsOverCommit: self.holdCursorsOverCommit,
209-
scrollableCursor: self.scrollableCursor,
210-
statementId: self.statementId,
211-
parameters: parameters,
212-
useCesu8: self.connection.useCesu8
213-
}), cb);
233+
if (!self.writer.finished && reply.rowsAffected == -1) {
234+
reply.rowsAffected = undefined;
235+
}
236+
self.pushReply(reply);
237+
if (!self.writer.finished && reply.writeLobReply) {
238+
self.writer.update(reply.writeLobReply);
239+
}
240+
cb();
214241
});
215-
};
242+
}
216243

217-
ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) {
244+
ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(buffer, finalize, cb) {
218245
var self = this;
219-
var availableSize = self.connection.getAvailableSize(true);
220-
self.writer.getWriteLobRequest(availableSize, function send(err, buffer) {
246+
self.connection.send(request.writeLob({
247+
writeLobRequest: buffer
248+
}), function (err, reply) {
221249
if (err) {
222-
return cb(err);
250+
return finalize(err);
223251
}
224-
self.connection.send(request.writeLob({
225-
writeLobRequest: buffer
226-
}), cb);
252+
self.pushReply(reply);
253+
cb();
227254
});
228255
};
229256

lib/util/Queue.js

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
var util = require('util');
1717
var EventEmitter = require('events').EventEmitter;
18+
var MessageType = require('../protocol/common/MessageType');
1819

1920
module.exports = Queue;
2021

@@ -26,6 +27,12 @@ function Queue(immediate) {
2627
this.queue = [];
2728
this.busy = false;
2829
this.running = !!immediate;
30+
// Records read lob tasks which can be called out of position when
31+
// the queue is blocked. If other tasks need to be called out of position
32+
// this can be changed to a Map with the message type as keys.
33+
this.readLobQueue = [];
34+
this.blocked = false;
35+
this.blockingTask = undefined;
2936
}
3037

3138
Object.defineProperty(Queue.prototype, 'empty', {
@@ -36,14 +43,25 @@ Object.defineProperty(Queue.prototype, 'empty', {
3643

3744
Queue.prototype.unshift = function unshift(task) {
3845
this.queue.unshift(task);
39-
if (this.running) {
46+
if (task.msgType === MessageType.READ_LOB) {
47+
this.readLobQueue.unshift(task);
48+
}
49+
if (this.blocked && this._isBlockingTask(task)) {
50+
this.emit('unblock', task);
51+
} else if (this.running) {
4052
this.dequeue();
4153
}
4254
return this;
4355
};
4456

4557
Queue.prototype.push = function push(task) {
58+
if (this.blocked && this._isBlockingTask(task)) {
59+
return this.unshift(task);
60+
}
4661
this.queue.push(task);
62+
if (task.msgType === MessageType.READ_LOB) {
63+
this.readLobQueue.push(task);
64+
}
4765
if (this.running) {
4866
this.dequeue();
4967
}
@@ -72,14 +90,28 @@ Queue.prototype.abort = function abort(err) {
7290
return this;
7391
};
7492

75-
Queue.prototype.createTask = function createTask(send, receive, name) {
76-
return new Task(send, receive, name);
93+
Queue.prototype.createTask = function createTask(send, receive, name, msgType) {
94+
return new Task(send, receive, name, msgType);
7795
};
7896

97+
Queue.prototype.block = function block(blockingTask) {
98+
this.blocked = true;
99+
this.blockingTask = blockingTask;
100+
}
101+
102+
Queue.prototype.unblock = function unblock() {
103+
this.blocked = false;
104+
this.blockingTask = undefined;
105+
}
106+
107+
Queue.prototype._isBlockingTask = function _isBlockingTask(task) {
108+
return task === this.blockingTask || task.msgType === MessageType.READ_LOB;
109+
}
110+
79111
Queue.prototype.dequeue = function dequeue() {
80112
var self = this;
81113

82-
function next(err, name) {
114+
function runNext() {
83115
/* jshint unused:false */
84116
self.busy = false;
85117
if (self.queue.length) {
@@ -89,21 +121,77 @@ Queue.prototype.dequeue = function dequeue() {
89121
}
90122
}
91123

124+
function runReadLob() {
125+
if (self.readLobQueue.length) {
126+
self.busy = false;
127+
if (self.running && !self.busy) {
128+
self.busy = true;
129+
var task = self.readLobQueue.shift();
130+
// Mark the task as ran so it will be skipped in the queue
131+
task.ran = true;
132+
// Optimization: When blocked, often read lobs are the most recently
133+
// added at the beginning or end of the queue so they can be removed from there
134+
// Note that the queue is not empty since it always has at least as many elements
135+
// as the readLobQueue
136+
if (self.queue[0] === task) {
137+
self.queue.shift();
138+
} else if (self.queue[self.queue.length - 1] === task) {
139+
self.queue.pop();
140+
}
141+
task.run(next);
142+
}
143+
} else {
144+
runNext();
145+
}
146+
}
147+
148+
function next(err, name) {
149+
if (self.blocked) {
150+
// Check if there exists a task that can be run
151+
if (self.queue.length && self.blockingTask === self.queue[0]) {
152+
self.unblock();
153+
runNext();
154+
} else if (self.readLobQueue.length) {
155+
runReadLob();
156+
} else {
157+
self.once('unblock', function runTask (task) {
158+
if (task === self.blockingTask) {
159+
self.unblock();
160+
runNext();
161+
} else {
162+
runReadLob();
163+
}
164+
});
165+
}
166+
} else {
167+
runNext();
168+
}
169+
}
170+
92171
function run() {
93172
if (self.running && !self.busy) {
94173
// Queue is running and not busy
95174
self.busy = true;
96175
var task = self.queue.shift();
97-
task.run(next);
176+
if (task.ran) {
177+
next(null, task.name);
178+
} else {
179+
if (task.msgType === MessageType.READ_LOB) {
180+
self.readLobQueue.shift();
181+
}
182+
task.run(next);
183+
}
98184
}
99185
}
100186
run();
101187
};
102188

103-
function Task(send, receive, name) {
189+
function Task(send, receive, name, msgType) {
104190
this.send = send;
105191
this.receive = receive;
106192
this.name = name;
193+
this.msgType = msgType;
194+
this.ran = false;
107195
}
108196

109197
Task.prototype.run = function run(next) {

0 commit comments

Comments
 (0)