11const os = require ( 'os' )
22const fs = require ( 'fs' )
3- const Stream = require ( 'stream' )
43
54const {
65 mergeUserTypes,
@@ -21,6 +20,7 @@ const { Query, CLOSE } = require('./query.js')
2120const Queue = require ( './queue.js' )
2221const { Errors, PostgresError } = require ( './errors.js' )
2322const Subscribe = require ( './subscribe.js' )
23+ const largeObject = require ( './large.js' )
2424
2525Object . assign ( Postgres , {
2626 PostgresError,
@@ -42,21 +42,22 @@ function Postgres(a, b) {
4242 let ending = false
4343
4444 const queries = Queue ( )
45- , connections = [ ...Array ( options . max ) ] . map ( ( ) => Connection ( options , { onopen, onend, ondrain, onclose } ) )
46- , closed = Queue ( connections )
45+ , connecting = Queue ( )
4746 , reserved = Queue ( )
47+ , closed = Queue ( )
48+ , ended = Queue ( )
4849 , open = Queue ( )
4950 , busy = Queue ( )
5051 , full = Queue ( )
51- , ended = Queue ( )
52- , connecting = Queue ( )
53- , queues = { closed , ended , connecting , reserved , open , busy , full }
52+ , queues = { connecting , reserved , closed , ended , open , busy , full }
53+
54+ const connections = [ ... Array ( options . max ) ] . map ( ( ) => Connection ( options , queues , { onopen , onend , onclose } ) )
5455
5556 const sql = Sql ( handler )
5657
5758 Object . assign ( sql , {
5859 get parameters ( ) { return options . parameters } ,
59- largeObject,
60+ largeObject : largeObject . bind ( null , sql ) ,
6061 subscribe,
6162 CLOSE ,
6263 END : CLOSE ,
@@ -229,90 +230,28 @@ function Postgres(a, b) {
229230
230231 function handler ( q ) {
231232 q . catch ( e => uncaughtError || ( uncaughtError = e ) )
232- c . state === ' full'
233+ c . queue === full
233234 ? queries . push ( q )
234- : c . execute ( q ) || ( c . state = 'full' , full . push ( c ) )
235+ : c . execute ( q ) || move ( c , full )
235236 }
236237 }
237238
238239 function onexecute ( c ) {
239- queues [ c . state ] . remove ( c )
240- c . state = ' reserved'
240+ connection = c
241+ move ( c , reserved )
241242 c . reserved = ( ) => queries . length
242243 ? c . execute ( queries . shift ( ) )
243- : c . state = 'reserved'
244- reserved . push ( c )
245- connection = c
244+ : move ( c , reserved )
246245 }
247246 }
248247
249- function largeObject ( oid , mode = 0x00020000 | 0x00040000 ) {
250- return new Promise ( async ( resolve , reject ) => {
251- await sql . begin ( async sql => {
252- let finish
253- ! oid && ( [ { oid } ] = await sql `select lo_creat(-1) as oid` )
254- const [ { fd } ] = await sql `select lo_open(${ oid } , ${ mode } ) as fd`
255-
256- const lo = {
257- writable,
258- readable,
259- close : ( ) => sql `select lo_close(${ fd } )` . then ( finish ) ,
260- tell : ( ) => sql `select lo_tell64(${ fd } )` ,
261- read : ( x ) => sql `select loread(${ fd } , ${ x } ) as data` ,
262- write : ( x ) => sql `select lowrite(${ fd } , ${ x } )` ,
263- truncate : ( x ) => sql `select lo_truncate64(${ fd } , ${ x } )` ,
264- seek : ( x , whence = 0 ) => sql `select lo_lseek64(${ fd } , ${ x } , ${ whence } )` ,
265- size : ( ) => sql `
266- select
267- lo_lseek64(${ fd } , location, 0) as position,
268- seek.size
269- from (
270- select
271- lo_lseek64($1, 0, 2) as size,
272- tell.location
273- from (select lo_tell64($1) as location) tell
274- ) seek
275- `
276- }
277-
278- resolve ( lo )
279-
280- return new Promise ( async r => finish = r )
281-
282- async function readable ( {
283- highWaterMark = 2048 * 8 ,
284- start = 0 ,
285- end = Infinity
286- } = { } ) {
287- let max = end - start
288- start && await lo . seek ( start )
289- return new Stream . Readable ( {
290- highWaterMark,
291- async read ( size ) {
292- const l = size > max ? size - max : size
293- max -= size
294- const [ { data } ] = await lo . read ( l )
295- this . push ( data )
296- if ( data . length < size )
297- this . push ( null )
298- }
299- } )
300- }
301-
302- async function writable ( {
303- highWaterMark = 2048 * 8 ,
304- start = 0
305- } = { } ) {
306- start && await lo . seek ( start )
307- return new Stream . Writable ( {
308- highWaterMark,
309- write ( chunk , encoding , callback ) {
310- lo . write ( chunk ) . then ( ( ) => callback ( ) , callback )
311- }
312- } )
313- }
314- } ) . catch ( reject )
315- } )
248+ function move ( c , queue ) {
249+ c . queue . remove ( c )
250+ queue . push ( c )
251+ c . queue = queue
252+ queue === open
253+ ? c . idleTimer . start ( )
254+ : c . idleTimer . cancel ( )
316255 }
317256
318257 function json ( x ) {
@@ -331,28 +270,27 @@ function Postgres(a, b) {
331270 return query . reject ( Errors . connection ( 'CONNECTION_ENDED' , options , options ) )
332271
333272 if ( open . length )
334- return go ( open , query )
273+ return go ( open . shift ( ) , query )
335274
336275 if ( closed . length )
337276 return connect ( closed . shift ( ) , query )
338277
339278 busy . length
340- ? go ( busy , query )
279+ ? go ( busy . shift ( ) , query )
341280 : queries . push ( query )
342281 }
343282
344- function go ( xs , query ) {
345- const c = xs . shift ( )
283+ function go ( c , query ) {
346284 return c . execute ( query )
347- ? ( c . state = 'busy' , busy . push ( c ) )
348- : ( c . state = 'full' , full . push ( c ) )
285+ ? move ( c , busy )
286+ : move ( c , full )
349287 }
350288
351289 function cancel ( query ) {
352290 return new Promise ( ( resolve , reject ) => {
353291 query . state
354292 ? query . active
355- ? Connection ( options , { } ) . cancel ( query . state , resolve , reject )
293+ ? Connection ( options ) . cancel ( query . state , resolve , reject )
356294 : query . cancelled = { resolve, reject }
357295 : (
358296 queries . remove ( query ) ,
@@ -386,21 +324,17 @@ function Postgres(a, b) {
386324 }
387325
388326 function connect ( c , query ) {
389- c . state = 'connecting'
390- connecting . push ( c )
327+ move ( c , connecting )
391328 c . connect ( query )
392329 }
393330
394331 function onend ( c ) {
395- queues [ c . state ] . remove ( c )
396- c . state = 'ended'
397- ended . push ( c )
332+ move ( c , ended )
398333 }
399334
400335 function onopen ( c ) {
401- queues [ c . state ] . remove ( c )
402336 if ( queries . length === 0 )
403- return ( c . state = 'open' , open . push ( c ) )
337+ return move ( c , open )
404338
405339 let max = Math . ceil ( queries . length / ( connecting . length + 1 ) )
406340 , ready = true
@@ -409,23 +343,15 @@ function Postgres(a, b) {
409343 ready = c . execute ( queries . shift ( ) )
410344
411345 ready
412- ? ( c . state = 'busy' , busy . push ( c ) )
413- : ( c . state = 'full' , full . push ( c ) )
414- }
415-
416- function ondrain ( c ) {
417- full . remove ( c )
418- onopen ( c )
346+ ? move ( c , busy )
347+ : move ( c , full )
419348 }
420349
421350 function onclose ( c ) {
422- queues [ c . state ] . remove ( c )
423- c . state = 'closed'
351+ move ( c , closed )
424352 c . reserved = null
425353 options . onclose && options . onclose ( c . id )
426- queries . length
427- ? connect ( c , queries . shift ( ) )
428- : queues . closed . push ( c )
354+ queries . length && connect ( c , queries . shift ( ) )
429355 }
430356}
431357
@@ -468,7 +394,8 @@ function parseOptions(a, b) {
468394 debug : o . debug ,
469395 fetch_types : 'fetch_types' in o ? o . fetch_types : true ,
470396 parameters : { } ,
471- shared : { retries : 0 , typeArrayMap : { } }
397+ shared : { retries : 0 , typeArrayMap : { } } ,
398+ publications : o . publications || query . get ( 'publications' ) || 'alltables'
472399 } ,
473400 mergeUserTypes ( o . types )
474401 )
0 commit comments