Skip to content

Commit 3840600

Browse files
committed
...
... ... ... ...
1 parent a4db0d3 commit 3840600

File tree

4 files changed

+132
-66
lines changed

4 files changed

+132
-66
lines changed

src/dialect/mysql/mysql-dialect-config.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ import { DatabaseConnection } from '../../driver/database-connection.js'
66
* https://github.com/sidorares/node-mysql2#using-connection-pools
77
*/
88
export interface MysqlDialectConfig {
9+
/**
10+
* The `mysql2` `createConnection` function or similar.
11+
*
12+
* TODO: ...
13+
*/
14+
createConnection?: (opts: any) => MysqlConnection | Promise<MysqlConnection>
15+
916
/**
1017
* A mysql2 Pool instance or a function that returns one.
1118
*
@@ -41,7 +48,10 @@ export interface MysqlPool {
4148
end(callback: (error: unknown) => void): void
4249
}
4350

44-
export interface MysqlPoolConnection {
51+
export interface MysqlConnection {
52+
config: unknown
53+
connect(callback?: (error: unknown) => void): void
54+
destroy(): void
4555
query(
4656
sql: string,
4757
parameters: ReadonlyArray<unknown>,
@@ -51,6 +61,12 @@ export interface MysqlPoolConnection {
5161
parameters: ReadonlyArray<unknown>,
5262
callback: (error: unknown, result: MysqlQueryResult) => void,
5363
): void
64+
threadId: number
65+
}
66+
67+
export type MysqlConectionConstructor = new (opts?: object) => MysqlConnection
68+
69+
export interface MysqlPoolConnection extends MysqlConnection {
5470
release(): void
5571
}
5672

src/dialect/mysql/mysql-driver.ts

Lines changed: 111 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
ControlConnectionProvider,
23
DatabaseConnection,
34
QueryResult,
45
} from '../../driver/database-connection.js'
@@ -39,7 +40,10 @@ export class MysqlDriver implements Driver {
3940
let connection = this.#connections.get(rawConnection)
4041

4142
if (!connection) {
42-
connection = new MysqlConnection(rawConnection)
43+
connection = new MysqlConnection(
44+
rawConnection,
45+
this.#config.createConnection,
46+
)
4347
this.#connections.set(rawConnection, connection)
4448

4549
// The driver must take care of calling `onCreateConnection` when a new
@@ -57,18 +61,6 @@ export class MysqlDriver implements Driver {
5761
return connection
5862
}
5963

60-
async #acquireConnection(): Promise<MysqlPoolConnection> {
61-
return new Promise((resolve, reject) => {
62-
this.#pool!.getConnection(async (err, rawConnection) => {
63-
if (err) {
64-
reject(err)
65-
} else {
66-
resolve(rawConnection)
67-
}
68-
})
69-
})
70-
}
71-
7264
async beginTransaction(
7365
connection: DatabaseConnection,
7466
settings: TransactionSettings,
@@ -155,106 +147,163 @@ export class MysqlDriver implements Driver {
155147
})
156148
})
157149
}
150+
151+
async #acquireConnection(): Promise<MysqlPoolConnection> {
152+
return new Promise((resolve, reject) => {
153+
this.#pool!.getConnection(async (err, rawConnection) => {
154+
if (err) {
155+
reject(err)
156+
} else {
157+
resolve(rawConnection)
158+
}
159+
})
160+
})
161+
}
158162
}
159163

160164
function isOkPacket(obj: unknown): obj is MysqlOkPacket {
161165
return isObject(obj) && 'insertId' in obj && 'affectedRows' in obj
162166
}
163167

164168
class MysqlConnection implements DatabaseConnection {
169+
readonly #createConnection: MysqlDialectConfig['createConnection']
165170
readonly #rawConnection: MysqlPoolConnection
166171

167-
constructor(rawConnection: MysqlPoolConnection) {
172+
constructor(
173+
rawConnection: MysqlPoolConnection,
174+
createConnection: MysqlDialectConfig['createConnection'],
175+
) {
176+
this.#createConnection = createConnection
168177
this.#rawConnection = rawConnection
169178
}
170179

180+
async cancelQuery(
181+
controlConnectionProvider: ControlConnectionProvider,
182+
): Promise<void> {
183+
try {
184+
// this removes the connection from the pool.
185+
// this is done to avoid picking it up after the `kill` command next, which
186+
// would cause an error when attempting to query.
187+
this.#rawConnection.destroy()
188+
} catch {
189+
// noop
190+
}
191+
192+
const { config, threadId } = this.#rawConnection
193+
194+
// this kills the query and the connection database-side.
195+
// we're not using `kill query <connection_id>` here because it doesn't
196+
// guarantee that the query is killed immediately. we saw that in tests,
197+
// the query can still run for a while after - including registering writes.
198+
const cancelQuery = `kill connection ${threadId}`
199+
200+
if (this.#createConnection && config) {
201+
const controlConnection = await this.#createConnection({ ...config })
202+
203+
return await new Promise((resolve, reject) => {
204+
controlConnection.connect((connectError) => {
205+
if (connectError) {
206+
return reject(connectError)
207+
}
208+
209+
controlConnection.query(cancelQuery, [], (error) => {
210+
controlConnection.destroy()
211+
212+
if (error) {
213+
return reject(error)
214+
}
215+
216+
resolve()
217+
})
218+
})
219+
})
220+
}
221+
222+
await controlConnectionProvider(async (controlConnection) => {
223+
await controlConnection.executeQuery(CompiledQuery.raw(cancelQuery, []))
224+
})
225+
}
226+
171227
async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> {
172228
try {
173229
const result = await this.#executeQuery(compiledQuery)
174230

175-
if (isOkPacket(result)) {
176-
const { insertId, affectedRows, changedRows } = result
177-
178-
return {
179-
insertId:
180-
insertId !== undefined &&
181-
insertId !== null &&
182-
insertId.toString() !== '0'
183-
? BigInt(insertId)
184-
: undefined,
185-
numAffectedRows:
186-
affectedRows !== undefined && affectedRows !== null
187-
? BigInt(affectedRows)
188-
: undefined,
189-
numChangedRows:
190-
changedRows !== undefined && changedRows !== null
191-
? BigInt(changedRows)
192-
: undefined,
193-
rows: [],
194-
}
195-
} else if (Array.isArray(result)) {
231+
if (!isOkPacket(result)) {
196232
return {
197-
rows: result as O[],
233+
rows: (Array.isArray(result) ? result : []) as never,
198234
}
199235
}
200236

237+
const { insertId, affectedRows, changedRows } = result
238+
201239
return {
240+
insertId:
241+
insertId !== undefined &&
242+
insertId !== null &&
243+
insertId.toString() !== '0'
244+
? BigInt(insertId)
245+
: undefined,
246+
numAffectedRows:
247+
affectedRows !== undefined && affectedRows !== null
248+
? BigInt(affectedRows)
249+
: undefined,
250+
numChangedRows:
251+
changedRows !== undefined && changedRows !== null
252+
? BigInt(changedRows)
253+
: undefined,
202254
rows: [],
203255
}
204256
} catch (err) {
205257
throw extendStackTrace(err, new Error())
206258
}
207259
}
208260

209-
#executeQuery(compiledQuery: CompiledQuery): Promise<MysqlQueryResult> {
210-
return new Promise((resolve, reject) => {
211-
this.#rawConnection.query(
212-
compiledQuery.sql,
213-
compiledQuery.parameters,
214-
(err, result) => {
215-
if (err) {
216-
reject(err)
217-
} else {
218-
resolve(result)
219-
}
220-
},
221-
)
222-
})
223-
}
224-
225261
async *streamQuery<O>(
226262
compiledQuery: CompiledQuery,
227263
_chunkSize: number,
228264
): AsyncIterableIterator<QueryResult<O>> {
229265
const stream = this.#rawConnection
230266
.query(compiledQuery.sql, compiledQuery.parameters)
231-
.stream<O>({
232-
objectMode: true,
233-
})
267+
.stream<O>({ objectMode: true })
234268

235269
try {
236270
for await (const row of stream) {
237271
yield {
238272
rows: [row],
239273
}
240274
}
241-
} catch (ex) {
275+
} catch (error) {
242276
if (
243-
ex &&
244-
typeof ex === 'object' &&
245-
'code' in ex &&
246-
// @ts-ignore
247-
ex.code === 'ERR_STREAM_PREMATURE_CLOSE'
277+
error &&
278+
typeof error === 'object' &&
279+
'code' in error &&
280+
error.code === 'ERR_STREAM_PREMATURE_CLOSE'
248281
) {
249282
// Most likely because of https://github.com/mysqljs/mysql/blob/master/lib/protocol/sequences/Query.js#L220
250283
return
251284
}
252285

253-
throw ex
286+
throw error
254287
}
255288
}
256289

257290
[PRIVATE_RELEASE_METHOD](): void {
258291
this.#rawConnection.release()
259292
}
293+
294+
#executeQuery(compiledQuery: CompiledQuery): Promise<MysqlQueryResult> {
295+
return new Promise((resolve, reject) => {
296+
this.#rawConnection.query(
297+
compiledQuery.sql,
298+
compiledQuery.parameters,
299+
(err, result) => {
300+
if (err) {
301+
reject(err)
302+
} else {
303+
resolve(result)
304+
}
305+
},
306+
)
307+
})
308+
}
260309
}

test/node/src/cancellation.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
} from './test-setup.js'
1313

1414
for (const dialect of DIALECTS) {
15-
if (dialect === 'postgres') {
15+
if (dialect === 'postgres' || dialect === 'mysql') {
1616
describe(`${dialect}: query cancellation`, () => {
1717
let ctx: TestContext
1818

@@ -64,7 +64,7 @@ for (const dialect of DIALECTS) {
6464
const delayedQuery = (
6565
{
6666
postgres: sql`select pg_sleep(0.1); ${writeQuery};`,
67-
mysql: sql`select sleep(0.1); ${writeQuery};`,
67+
mysql: sql`do sleep(0.1); ${writeQuery};`,
6868
mssql: sql`waitfor delay '00:00:00.100'; ${writeQuery};`,
6969
sqlite: sql`WITH RECURSIVE timer(i) AS (
7070
SELECT 1

test/node/src/test-setup.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import chaiAsPromised from 'chai-as-promised'
33
import * as chaiSubset from 'chai-subset'
44
import * as Cursor from 'pg-cursor'
55
import { Pool, PoolConfig } from 'pg'
6-
import { createPool } from 'mysql2'
6+
import { createConnection, createPool } from 'mysql2'
77
import * as Database from 'better-sqlite3'
88
import * as Tarn from 'tarn'
99
import * as Tedious from 'tedious'
@@ -201,6 +201,7 @@ export const DB_CONFIGS: PerDialectVariant<KyselyConfig> = {
201201

202202
mysql: {
203203
dialect: new MysqlDialect({
204+
createConnection,
204205
pool: async () => createPool(DIALECT_CONFIGS.mysql),
205206
}),
206207
plugins: PLUGINS,

0 commit comments

Comments
 (0)