Skip to content

Conversation

@foxfriends
Copy link

@foxfriends foxfriends commented Dec 29, 2025

Added support for PostgreSQL LISTEN/NOTIFY by lightly wrapping the implementation in pgo.

As in pgo:

  • The notifications listener is started as a new process, different from a regular connection.
  • The listen(conn, channel) function subscribes to a Postgres channel.
  • unlisten(conn, reference) can be used to unsubscribe.
  • Notifications from Postgres are sent to the process that called listen as the Notify type. Receive these using the pog.notification_selector().

General usage example:

let assert Ok(db) = pog.default_config("db")
    |> pog.start()

let assert Ok(notifications) = pog.default_config("notifications")
    |> pog.start_notifications()

let assert Ok(reference) = pog.listen(notifications.data, "some_channel")

let selector = pog.notification_selector()

let assert Ok(_) = pog.query("NOTIFY some_channel, 'some_payload'")
    |> pog.execute(db.data)

let assert Ok(Notify(pid, ref, chan, data)) = process.selector_receive(selector, 100)
assert pid == notifications.pid
assert ref == reference
assert chan == "some_channel"
assert data = "some_payload"

pog.unlisten(notifications.data, reference)

Closes #13

@foxfriends foxfriends force-pushed the main branch 3 times, most recently from 4184091 to dae5cc3 Compare December 31, 2025 19:05
Added support for PostgreSQL `LISTEN/NOTIFY` by lightly wrapping the
implementation in `pgo`.

As in `pgo`:
* The notifications listener is started as a new process, different from a regular connection.
* The `listen(conn, channel)` function subscribes to a Postgres channel.
* `unlisten(conn, reference)` can be used to unsubscribe.
* Notifications from Postgres are sent to the process that called `listen` as the `Notify` type. Receive these using the `pog.notification_selector()`.

General usage example:

```gleam
let assert Ok(db) = pog.default_config("db")
    |> pog.start()

let assert Ok(notifications) = pog.default_config("notifications")
    |> pog.start_notifications()

let assert Ok(reference) = pog.listen(notifications.data, "some_channel")

let selector = pog.notification_selector()

let assert Ok(_) = pog.query("NOTIFY some_channel, 'some_payload'")
    |> pog.execute(db.data)

let assert Ok(Notify(pid, ref, chan, data)) = process.selector_receive(selector, 100)
assert pid == notifications.pid
assert ref == reference
assert chan == "some_channel"
assert data = "some_payload"

pog.unlisten(notifications.data, reference)
```
@foxfriends
Copy link
Author

erleans/pgo#119

I found and patched a bug in the underlying pgo implementation, so I suggest waiting until that's merged before considering accepting this PR.

Having been trying out my patch for a few days in a project, I am finding it to work fine, but maybe is not the most idiomatic Gleam; pretty new to it all, and not sure the best way to represent this system, I went with a "light" approach so far, mimicking the pgo interface, but open to suggestions to make it more "Gleam" flavoured for sure. :)

Copy link
Owner

@lpil lpil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've left some questions inline

///
/// If no notifications process has been started using this name then
/// listeners using this connection will fail.
///
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this documentation correct? It doesn't seem to match what the function does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is correct, I added a test that confirms it: if the name is used to listen, but no notifications process is started, the listen function returns Error(Nil) as expected.

fn decode_notification(dyn: dynamic.Dynamic) -> Result(Notification, Nil)

type Tag {
Notification
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very generic tag, the chance of collisions is extremely high. Is there no way to use a unique tag to prevent runtime errors?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, there is no way to change the tag, the notification atom is used by the pgo_notifications, and we just need to receive that:

notify(ListenerChannels, #notification_response{channel=Channel,
                                                payload=Payload}) ->
    case maps:find(Channel, ListenerChannels) of
        {ok, Listeners} ->
            maps:foreach(fun(Ref, Pid) ->
                                 Pid ! {notification, self(), Ref, Channel, Payload}
                         end, Listeners);
        error ->
            ok
    end.

Unless there is some other way to map notifications from Erlang code to Gleam that I have not found?

end,
pgo_pool:start_link(PoolName, Options2).

start_notifications(Config) ->
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code the same as the existing start function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly the same, the last two lines are different:

start(Config) ->
    ...
    pgo_pool:start_link(PoolName, Options2).

start_notifications(Config) ->
    ...
    Options3 = normalize_pool_config(Options2),
    pgo_notifications:start_link({local, PoolName}, Options3).

When I was digging through the pgo code, it seems that the equivalent normalize_pool_config was called internally to pgo when starting a regular pool, but when starting a notifications pool they did not call it, so it had to be added here manually.

The base options part is the same though, I can extract the options handling to a new function.

@foxfriends
Copy link
Author

Thanks for the notes, I have updated it, let me know if anything else :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

LISTEN/NOTIFY support?

2 participants