@@ -11,6 +11,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
1111
1212 const sql = subscribe . sql = postgres ( {
1313 ...options ,
14+ transform : { column : { } , value : { } , row : { } } ,
1415 max : 1 ,
1516 fetch_types : false ,
1617 idle_timeout : null ,
@@ -103,7 +104,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
103104
104105 function data ( x ) {
105106 if ( x [ 0 ] === 0x77 )
106- parse ( x . slice ( 25 ) , state , sql . options . parsers , handle )
107+ parse ( x . subarray ( 25 ) , state , sql . options . parsers , handle , options . transform )
107108 else if ( x [ 0 ] === 0x6b && x [ 17 ] )
108109 pong ( )
109110 }
@@ -136,15 +137,15 @@ function Time(x) {
136137 return new Date ( Date . UTC ( 2000 , 0 , 1 ) + Number ( x / BigInt ( 1000 ) ) )
137138}
138139
139- function parse ( x , state , parsers , handle ) {
140+ function parse ( x , state , parsers , handle , transform ) {
140141 const char = ( acc , [ k , v ] ) => ( acc [ k . charCodeAt ( 0 ) ] = v , acc )
141142
142143 Object . entries ( {
143144 R : x => { // Relation
144145 let i = 1
145146 const r = state [ x . readUInt32BE ( i ) ] = {
146- schema : String ( x . slice ( i += 4 , i = x . indexOf ( 0 , i ) ) ) || 'pg_catalog' ,
147- table : String ( x . slice ( i + 1 , i = x . indexOf ( 0 , i + 1 ) ) ) ,
147+ schema : x . toString ( 'utf8' , i += 4 , i = x . indexOf ( 0 , i ) ) || 'pg_catalog' ,
148+ table : x . toString ( 'utf8' , i + 1 , i = x . indexOf ( 0 , i + 1 ) ) ,
148149 columns : Array ( x . readUInt16BE ( i += 2 ) ) ,
149150 keys : [ ]
150151 }
@@ -156,7 +157,9 @@ function parse(x, state, parsers, handle) {
156157 while ( i < x . length ) {
157158 column = r . columns [ columnIndex ++ ] = {
158159 key : x [ i ++ ] ,
159- name : String ( x . slice ( i , i = x . indexOf ( 0 , i ) ) ) ,
160+ name : transform . column . from
161+ ? transform . column . from ( x . toString ( 'utf8' , i , i = x . indexOf ( 0 , i ) ) )
162+ : x . toString ( 'utf8' , i , i = x . indexOf ( 0 , i ) ) ,
160163 type : x . readUInt32BE ( i += 1 ) ,
161164 parser : parsers [ x . readUInt32BE ( i ) ] ,
162165 atttypmod : x . readUInt32BE ( i += 4 )
@@ -170,13 +173,12 @@ function parse(x, state, parsers, handle) {
170173 O : ( ) => { /* noop */ } , // Origin
171174 B : x => { // Begin
172175 state . date = Time ( x . readBigInt64BE ( 9 ) )
173- state . lsn = x . slice ( 1 , 9 )
176+ state . lsn = x . subarray ( 1 , 9 )
174177 } ,
175178 I : x => { // Insert
176179 let i = 1
177180 const relation = state [ x . readUInt32BE ( i ) ]
178- const row = { }
179- tuples ( x , row , relation . columns , i += 7 )
181+ const { row } = tuples ( x , relation . columns , i += 7 , transform )
180182
181183 handle ( row , {
182184 command : 'insert' ,
@@ -188,13 +190,10 @@ function parse(x, state, parsers, handle) {
188190 const relation = state [ x . readUInt32BE ( i ) ]
189191 i += 4
190192 const key = x [ i ] === 75
191- const row = key || x [ i ] === 79
192- ? { }
193+ handle ( key || x [ i ] === 79
194+ ? tuples ( x , key ? relation . keys : relation . columns , i += 3 , transform ) . row
193195 : null
194-
195- tuples ( x , row , key ? relation . keys : relation . columns , i += 3 )
196-
197- handle ( row , {
196+ , {
198197 command : 'delete' ,
199198 relation,
200199 key
@@ -205,35 +204,36 @@ function parse(x, state, parsers, handle) {
205204 const relation = state [ x . readUInt32BE ( i ) ]
206205 i += 4
207206 const key = x [ i ] === 75
208- const old = key || x [ i ] === 79
209- ? { }
207+ const xs = key || x [ i ] === 79
208+ ? tuples ( x , key ? relation . keys : relation . columns , i += 3 , transform )
210209 : null
211210
212- old && ( i = tuples ( x , old , key ? relation . keys : relation . columns , i += 3 ) )
211+ xs && ( i = xs . i )
213212
214- const row = { }
215- tuples ( x , row , relation . columns , i + 3 )
213+ const { row } = tuples ( x , relation . columns , i + 3 , transform )
216214
217215 handle ( row , {
218216 command : 'update' ,
219217 relation,
220218 key,
221- old
219+ old : xs && xs . row
222220 } )
223221 } ,
224222 T : ( ) => { /* noop */ } , // Truncate,
225223 C : ( ) => { /* noop */ } // Commit
226224 } ) . reduce ( char , { } ) [ x [ 0 ] ] ( x )
227225}
228226
229- function tuples ( x , row , columns , xi ) {
227+ function tuples ( x , columns , xi , transform ) {
230228 let type
231229 , column
230+ , value
232231
232+ const row = transform . raw ? new Array ( columns . length ) : { }
233233 for ( let i = 0 ; i < columns . length ; i ++ ) {
234234 type = x [ xi ++ ]
235235 column = columns [ i ]
236- row [ column . name ] = type === 110 // n
236+ value = type === 110 // n
237237 ? null
238238 : type === 117 // u
239239 ? undefined
@@ -242,9 +242,18 @@ function tuples(x, row, columns, xi) {
242242 : column . parser . array === true
243243 ? column . parser ( x . toString ( 'utf8' , xi + 5 , xi += 4 + x . readUInt32BE ( xi ) ) )
244244 : column . parser ( x . toString ( 'utf8' , xi + 4 , xi += 4 + x . readUInt32BE ( xi ) ) )
245+
246+ transform . raw
247+ ? ( row [ i ] = transform . raw === true
248+ ? value
249+ : transform . value . from ? transform . value . from ( value , column ) : value )
250+ : ( row [ column . name ] = transform . value . from
251+ ? transform . value . from ( value , column )
252+ : value
253+ )
245254 }
246255
247- return xi
256+ return { i : xi , row : transform . row . from ? transform . row . from ( row ) : row }
248257}
249258
250259function parseEvent ( x ) {
0 commit comments