11const fs = require ( 'fs' )
22const Url = require ( 'url' )
3+ const Stream = require ( 'stream' )
34const Connection = require ( './connection.js' )
45const Queue = require ( './queue.js' )
56const { errors, PostgresError } = require ( './errors.js' )
@@ -146,7 +147,7 @@ function Postgres(a, b) {
146147 } )
147148 . then ( begin && ( ( ) => {
148149 connections . push ( connection )
149- next ( )
150+ next ( connection )
150151 } ) )
151152
152153 function scoped ( xs ) {
@@ -158,7 +159,8 @@ function Postgres(a, b) {
158159 let c
159160 , x
160161
161- while ( queries . length && ( c = getConnection ( queries . peek ( ) . fn ) ) && ( x = queries . shift ( ) ) ) {
162+ while ( ( x = queries . peek ( ) ) && ( c = x . query && x . query . connection || getConnection ( queries . peek ( ) . fn ) ) && queries . shift ( ) ) {
163+ x . query && x . query . connection && x . query . writable && ( c . blocked = true )
162164 x . fn
163165 ? transaction ( x , c )
164166 : send ( c , x . query , x . xs , x . args )
@@ -205,9 +207,12 @@ function Postgres(a, b) {
205207 }
206208
207209 function send ( connection , query , xs , args ) {
208- connection
209- ? process . nextTick ( connection . send , query , query . tagged ? parseTagged ( query , xs , args ) : parseUnsafe ( query , xs , args ) )
210- : queries . push ( { query, xs, args } )
210+ connection && ( query . connection = connection )
211+ if ( ! connection || connection . blocked )
212+ return queries . push ( { query, xs, args, connection } )
213+
214+ connection . blocked = query . blocked
215+ process . nextTick ( connection . send , query , query . tagged ? parseTagged ( query , xs , args ) : parseUnsafe ( query , xs , args ) )
211216 }
212217
213218 function getConnection ( reserve ) {
@@ -325,9 +330,15 @@ function Postgres(a, b) {
325330 }
326331
327332 function addMethods ( promise , query ) {
333+ promise . readable = ( ) => readable ( promise , query )
334+ promise . writable = ( ) => writable ( promise , query )
328335 promise . raw = ( ) => ( query . raw = true , promise )
329336 promise . stream = ( fn ) => ( query . stream = fn , promise )
330- promise . cursor = ( rows , fn ) => {
337+ promise . cursor = cursor ( promise , query )
338+ }
339+
340+ function cursor ( promise , query ) {
341+ return ( rows , fn ) => {
331342 if ( typeof rows === 'function' ) {
332343 fn = rows
333344 rows = 1
@@ -339,6 +350,53 @@ function Postgres(a, b) {
339350 }
340351 }
341352
353+ function readable ( promise , query ) {
354+ query . connection
355+ ? query . connection . blocked = true
356+ : query . blocked = true
357+
358+ const read = ( ) => query . connection . socket . isPaused ( ) && query . connection . socket . resume ( )
359+ promise . catch ( err => query . readable . destroy ( err ) ) . then ( ( ) => {
360+ query . connection . blocked = false
361+ read ( )
362+ next ( )
363+ } )
364+ return query . readable = new Stream . Readable ( { read } )
365+ }
366+
367+ function writable ( promise , query ) {
368+ query . connection
369+ ? query . connection . blocked = true
370+ : query . blocked = true
371+ let error
372+ query . prepare = false
373+ query . simple = true
374+ query . writable = [ ]
375+ promise . catch ( err => error = err ) . then ( ( ) => {
376+ query . connection . blocked = false
377+ next ( )
378+ } )
379+ return query . readable = new Stream . Duplex ( {
380+ read ( ) { /* backpressure handling not possible */ } ,
381+ write ( chunk , encoding , callback ) {
382+ error
383+ ? callback ( error )
384+ : query . writable . push ( { chunk, callback } )
385+ } ,
386+ destroy ( error , callback ) {
387+ query . writable . push ( { error } )
388+ callback ( error )
389+ } ,
390+ final ( callback ) {
391+ if ( error )
392+ return callback ( error )
393+
394+ query . writable . push ( { chunk : null } )
395+ promise . then ( ( ) => callback ( ) , callback )
396+ }
397+ } )
398+ }
399+
342400 function listen ( channel , fn ) {
343401 const listener = getListener ( )
344402
0 commit comments