Skip to content

Commit 18de8cd

Browse files
authored
Merge pull request #448 from tcheeric/codex/add-non-blocking-subscription-api
chore: tidy spring websocket client files
2 parents c06f6f4 + 5968fcc commit 18de8cd

30 files changed

Lines changed: 920 additions & 221 deletions

File tree

.github/pull_request_template.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
## Why now?
1+
## Summary
22
<!-- Explain the problem, context, and why this change is needed. Link to the issue. -->
3+
34
Related issue: #____
45

56
## What changed?

.github/workflows/codex.yml

Lines changed: 0 additions & 62 deletions
This file was deleted.

.github/workflows/release-please.yml

Lines changed: 0 additions & 23 deletions
This file was deleted.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,4 @@ data
224224

225225
# Original versions of merged files
226226
*.orig
227+
/.qodana/

PR_DRAFT.md

Lines changed: 0 additions & 30 deletions
This file was deleted.

PR_DRAFT_PR1.md

Lines changed: 0 additions & 27 deletions
This file was deleted.

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,37 @@ See [`docs/CODEBASE_OVERVIEW.md`](docs/CODEBASE_OVERVIEW.md) for details about r
1919
## Examples
2020
Examples are located in the [`nostr-java-examples`](./nostr-java-examples) module.
2121

22+
- [`SpringSubscriptionExample`](nostr-java-examples/src/main/java/nostr/examples/SpringSubscriptionExample.java)
23+
shows how to open a non-blocking `NostrSpringWebSocketClient` subscription and close it after a
24+
fixed duration.
25+
26+
## Streaming subscriptions
27+
28+
The client and API layers expose a non-blocking streaming API for long-lived subscriptions. Use
29+
`NostrSpringWebSocketClient.subscribe` to open a REQ subscription and receive relay messages via a
30+
callback:
31+
32+
```java
33+
Filters filters = new Filters(new KindFilter<>(Kind.TEXT_NOTE));
34+
AutoCloseable subscription =
35+
client.subscribe(
36+
filters,
37+
"example-subscription",
38+
message -> {
39+
// handle EVENT/NOTICE payloads on your own executor to avoid blocking the socket thread
40+
},
41+
error -> log.warn("Subscription error", error));
42+
43+
// ... keep the subscription open while processing events ...
44+
45+
subscription.close(); // sends CLOSE to the relay and releases the underlying WebSocket
46+
```
47+
48+
Subscriptions must be closed by the caller to ensure a CLOSE frame is sent to the relay and to free
49+
the dedicated WebSocket connection created for the REQ. Callbacks run on the WebSocket thread; for
50+
high-throughput feeds, hand off work to a queue or executor to provide backpressure and keep the
51+
socket responsive.
52+
2253
## Supported NIPs
2354
The API currently implements the following [NIPs](https://github.com/nostr-protocol/nips):
2455
- [NIP-1](https://github.com/nostr-protocol/nips/blob/master/01.md) - Basic protocol flow description

docs/reference/nostr-java-api.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ Abstraction over a WebSocket connection to a relay.
6565
```java
6666
<T extends BaseMessage> List<String> send(T eventMessage) throws IOException
6767
List<String> send(String json) throws IOException
68+
AutoCloseable subscribe(String requestJson,
69+
Consumer<String> messageListener,
70+
Consumer<Throwable> errorListener,
71+
Runnable closeListener) throws IOException
72+
<T extends BaseMessage> AutoCloseable subscribe(T eventMessage,
73+
Consumer<String> messageListener,
74+
Consumer<Throwable> errorListener,
75+
Runnable closeListener) throws IOException
6876
void close() throws IOException
6977
```
7078

@@ -75,6 +83,10 @@ Spring `TextWebSocketHandler` based implementation of `WebSocketClientIF`.
7583
public StandardWebSocketClient(String relayUri)
7684
public <T extends BaseMessage> List<String> send(T eventMessage) throws IOException
7785
public List<String> send(String json) throws IOException
86+
public AutoCloseable subscribe(String requestJson,
87+
Consumer<String> messageListener,
88+
Consumer<Throwable> errorListener,
89+
Runnable closeListener) throws IOException
7890
public void close() throws IOException
7991
```
8092

@@ -84,6 +96,14 @@ Wrapper that adds retry logic around a `WebSocketClientIF`.
8496
```java
8597
public List<String> send(BaseMessage eventMessage) throws IOException
8698
public List<String> send(String json) throws IOException
99+
public AutoCloseable subscribe(BaseMessage requestMessage,
100+
Consumer<String> messageListener,
101+
Consumer<Throwable> errorListener,
102+
Runnable closeListener) throws IOException
103+
public AutoCloseable subscribe(String json,
104+
Consumer<String> messageListener,
105+
Consumer<Throwable> errorListener,
106+
Runnable closeListener) throws IOException
87107
public List<String> recover(IOException ex, String json) throws IOException
88108
public void close() throws IOException
89109
```
@@ -95,12 +115,24 @@ High level client coordinating multiple relay connections and signing.
95115
public NostrIF setRelays(Map<String,String> relays)
96116
public List<String> sendEvent(IEvent event)
97117
public List<String> sendRequest(List<Filters> filters, String subscriptionId)
118+
public AutoCloseable subscribe(Filters filters, String subscriptionId, Consumer<String> listener)
119+
public AutoCloseable subscribe(Filters filters,
120+
String subscriptionId,
121+
Consumer<String> listener,
122+
Consumer<Throwable> errorListener)
98123
public NostrIF sign(Identity identity, ISignable signable)
99124
public boolean verify(GenericEvent event)
100125
public Map<String,String> getRelays()
101126
public void close()
102127
```
103128

129+
`subscribe` opens a dedicated WebSocket per relay, returns immediately, and streams raw relay
130+
messages to the provided listener. The returned `AutoCloseable` sends a `CLOSE` command and releases
131+
resources when invoked. Because callbacks execute on the WebSocket thread, delegate heavy
132+
processing to another executor to avoid stalling inbound traffic. The
133+
[`SpringSubscriptionExample`](../../nostr-java-examples/src/main/java/nostr/examples/SpringSubscriptionExample.java)
134+
demonstrates how to open a subscription and close it after a fixed duration.
135+
104136
### Configuration
105137
- `RetryConfig` – enables Spring Retry support.
106138
- `RelaysProperties` – maps relay names to URLs via configuration properties.

nostr-java-api/src/main/java/nostr/api/NostrIF.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.util.List;
55
import java.util.Map;
6+
import java.util.function.Consumer;
67
import lombok.NonNull;
78
import nostr.base.IEvent;
89
import nostr.base.ISignable;
@@ -90,6 +91,34 @@ List<String> sendRequest(
9091
@NonNull String subscriptionId,
9192
Map<String, String> relays);
9293

94+
/**
95+
* Subscribe to a stream of events for the given filter on configured relays.
96+
*
97+
* @param filters the filter describing events to stream
98+
* @param subscriptionId identifier for the subscription
99+
* @param listener consumer invoked for each raw relay message
100+
* @return a handle that cancels the subscription when closed
101+
*/
102+
AutoCloseable subscribe(
103+
@NonNull Filters filters,
104+
@NonNull String subscriptionId,
105+
@NonNull Consumer<String> listener);
106+
107+
/**
108+
* Subscribe to a stream of events with custom error handling.
109+
*
110+
* @param filters the filter describing events to stream
111+
* @param subscriptionId identifier for the subscription
112+
* @param listener consumer invoked for each raw relay message
113+
* @param errorListener optional consumer invoked when a transport error occurs
114+
* @return a handle that cancels the subscription when closed
115+
*/
116+
AutoCloseable subscribe(
117+
@NonNull Filters filters,
118+
@NonNull String subscriptionId,
119+
@NonNull Consumer<String> listener,
120+
Consumer<Throwable> errorListener);
121+
93122
/**
94123
* Sign a signable object with the provided identity.
95124
*

0 commit comments

Comments
 (0)