44 * Holds global app data like user.
55 */
66
7+ import { PGliteInterface } from '@electric-sql/pglite'
78import { User } from '@supabase/supabase-js'
9+ import { Mutex } from 'async-mutex'
810import {
911 createContext ,
1012 PropsWithChildren ,
@@ -120,8 +122,6 @@ export default function AppProvider({ children }: AppProps) {
120122 throw new Error ( 'dbManager is not available' )
121123 }
122124
123- const db = await dbManager . getDbInstance ( databaseId )
124-
125125 const databaseHostname = `${ databaseId } .${ process . env . NEXT_PUBLIC_BROWSER_PROXY_DOMAIN } `
126126
127127 const ws = new WebSocket ( `wss://${ databaseHostname } ` )
@@ -132,30 +132,32 @@ export default function AppProvider({ children }: AppProps) {
132132 setLiveSharedDatabaseId ( databaseId )
133133 }
134134
135- ws . onmessage = async ( event ) => {
136- const message = new Uint8Array ( await event . data )
137-
138- if ( isStartupMessage ( message ) ) {
139- const parameters = parseStartupMessage ( message )
140- if ( 'client_ip' in parameters ) {
141- // client disconnected
142- if ( parameters . client_ip === '' ) {
143- setConnectedClientIp ( null )
144- // we ensure we're not in a transaction block first
145- await db . sql `rollback;` . catch ( )
146- // we clean the session state, see: https://www.pgbouncer.org/faq.html#how-to-use-prepared-statements-with-session-pooling
147- // we do this to avoid having old prepared statements in the session
148- await db . sql `discard all;`
149- } else {
150- setConnectedClientIp ( parameters . client_ip )
135+ const mutex = new Mutex ( )
136+ let db : PGliteInterface
137+
138+ ws . onmessage = ( event ) => {
139+ mutex . runExclusive ( async ( ) => {
140+ const message = new Uint8Array ( await event . data )
141+
142+ if ( isStartupMessage ( message ) ) {
143+ const parameters = parseStartupMessage ( message )
144+ if ( 'client_ip' in parameters ) {
145+ // client disconnected
146+ if ( parameters . client_ip === '' ) {
147+ setConnectedClientIp ( null )
148+ await dbManager . closeDbInstance ( databaseId )
149+ } else {
150+ db = await dbManager . getDbInstance ( databaseId )
151+ setConnectedClientIp ( parameters . client_ip )
152+ }
151153 }
154+ return
152155 }
153- return
154- }
155156
156- const response = await db . execProtocolRaw ( message )
157+ const response = await db . execProtocolRaw ( message )
157158
158- ws . send ( response )
159+ ws . send ( response )
160+ } )
159161 }
160162 ws . onclose = ( event ) => {
161163 cleanUp ( )
0 commit comments