diff --git a/src/connection.ts b/src/connection.ts index d446c9922..48aa42880 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3432,7 +3432,10 @@ class Connection extends EventEmitter { const handler = new Login7TokenHandler(this); const tokenStreamParser = this.createTokenStreamParser(message, handler); - await once(tokenStreamParser, 'end'); + await Promise.race([ + once(tokenStreamParser, 'end'), + signalAborted + ]); if (handler.loginAckReceived) { return handler.routingData; diff --git a/test/unit/connection-failure-test.ts b/test/unit/connection-failure-test.ts index 4ff1c08bf..3899abdfd 100644 --- a/test/unit/connection-failure-test.ts +++ b/test/unit/connection-failure-test.ts @@ -6,6 +6,7 @@ import OutgoingMessageStream from '../../src/outgoing-message-stream'; import Debug from '../../src/debug'; import PreloginPayload from '../../src/prelogin-payload'; import Message from '../../src/message'; +import { Packet } from '../../src/packet'; function buildLoginAckToken(): Buffer { const progname = 'Tedious SQL Server'; @@ -210,6 +211,153 @@ describe('Connection failure handling', function() { }); }); + it('should fail correctly when the connection is aborted while the Login7 response is being received', function(done) { + server.on('connection', async (connection) => { + const debug = new Debug(); + const incomingMessageStream = new IncomingMessageStream(debug); + const outgoingMessageStream = new OutgoingMessageStream(debug, { packetSize: 4 * 1024 }); + + connection.pipe(incomingMessageStream); + outgoingMessageStream.pipe(connection); + + try { + const messageIterator = incomingMessageStream[Symbol.asyncIterator](); + + // PRELOGIN + { + const { value: message } = await messageIterator.next(); + assert.strictEqual(message.type, 0x12); + + const chunks: Buffer[] = []; + for await (const data of message) { + chunks.push(data); + } + + const responsePayload = new PreloginPayload({ encrypt: false, version: { major: 1, minor: 2, build: 3, subbuild: 0 } }); + const responseMessage = new Message({ type: 0x12 }); + responseMessage.end(responsePayload.data); + outgoingMessageStream.write(responseMessage); + } + + // LOGIN7 + { + const { value: message } = await messageIterator.next(); + assert.strictEqual(message.type, 0x10); + + const chunks: Buffer[] = []; + for await (const data of message) { + chunks.push(data); + } + + // Send the first packet of an incomplete response message + // (the `EOM` status bit is not set), then abort the connection + // before the message is ever completed. + const packet = new Packet(0x04); + packet.addData(buildLoginAckToken()); + connection.write(packet.buffer); + + setImmediate(() => { + connection.destroy(); + }); + } + } catch (err) { + console.log(err); + } + }); + + const connection = new Connection({ + server: (server.address() as net.AddressInfo).address, + options: { + port: (server.address() as net.AddressInfo).port, + encrypt: false + } + }); + + connection.connect((err) => { + connection.close(); + + assert.instanceOf(err, ConnectionError); + assert.strictEqual('Connection lost - socket hang up', err.message); + + assert.instanceOf(err.cause, Error); + assert.strictEqual('socket hang up', err.cause.message); + + done(); + }); + }); + + it('should time out correctly when the Login7 response stalls', function(done) { + server.on('connection', async (connection) => { + const debug = new Debug(); + const incomingMessageStream = new IncomingMessageStream(debug); + const outgoingMessageStream = new OutgoingMessageStream(debug, { packetSize: 4 * 1024 }); + + connection.pipe(incomingMessageStream); + outgoingMessageStream.pipe(connection); + + try { + const messageIterator = incomingMessageStream[Symbol.asyncIterator](); + + // PRELOGIN + { + const { value: message } = await messageIterator.next(); + assert.strictEqual(message.type, 0x12); + + const chunks: Buffer[] = []; + for await (const data of message) { + chunks.push(data); + } + + const responsePayload = new PreloginPayload({ encrypt: false, version: { major: 1, minor: 2, build: 3, subbuild: 0 } }); + const responseMessage = new Message({ type: 0x12 }); + responseMessage.end(responsePayload.data); + outgoingMessageStream.write(responseMessage); + } + + // LOGIN7 + { + const { value: message } = await messageIterator.next(); + assert.strictEqual(message.type, 0x10); + + const chunks: Buffer[] = []; + for await (const data of message) { + chunks.push(data); + } + + // Send the first packet of an incomplete response message + // (the `EOM` status bit is not set), then go silent, leaving + // the message unfinished. + const packet = new Packet(0x04); + packet.addData(buildLoginAckToken()); + connection.write(packet.buffer); + } + } catch (err) { + console.log(err); + } + }); + + const addressInfo = server.address() as net.AddressInfo; + + const connection = new Connection({ + server: addressInfo.address, + options: { + port: addressInfo.port, + encrypt: false, + connectTimeout: 1000 + } + }); + + connection.connect((err) => { + connection.close(); + + assert.instanceOf(err, ConnectionError); + assert.strictEqual(err.code, 'ETIMEOUT'); + assert.strictEqual(`Failed to connect to ${addressInfo.address}:${addressInfo.port} in 1000ms`, err.message); + + done(); + }); + }); + it('should fail correctly when the connection is aborted after the Login7 response is received', function(done) { server.on('connection', async (connection) => { const debug = new Debug();