diff --git a/pom.xml b/pom.xml
index 1f60e61..b68bdd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,17 +1,12 @@
4.0.0
- com.github.aslakhellesoy
+ com.whizzosoftware
eventsource-client
${project.artifactId}
A Java EventSource Client
- http://aslakhellesoy.github.com/eventsource-java
- 0.1.2.1
+ http://whizzosoftware.github.com/eventsource-java
+ 0.1.3-SNAPSHOT
jar
-
- org.sonatype.oss
- oss-parent
- 6
-
BSD License
@@ -19,18 +14,13 @@
repo
-
- scm:git:git://github.com/aslakhellesoy/eventsource-java.git
- scm:git:git@github.com:aslakhellesoy/eventsource-java.git
- git://github.com/aslakhellesoy/eventsource-java.git
-
-
-
- repository.jboss.org
- http://repository.jboss.org/nexus/content/groups/public/
-
-
+
+ org.slf4j
+ slf4j-api
+ 1.7.6
+ compile
+
org.jboss.netty
netty
@@ -67,23 +57,18 @@
1.6
-
- org.apache.maven.plugins
- maven-gpg-plugin
- 1.1
-
- true
-
-
-
- sign-artifacts
- verify
-
- sign
-
-
-
-
+
+
+ nx-releases
+ Nexus Releases
+ ${nexus.releases.url}
+
+
+ nx-snapshots
+ Nexus Snapshots
+ ${nexus.snapshots.url}
+
+
diff --git a/src/main/java/com/github/eventsource/client/EventSource.java b/src/main/java/com/github/eventsource/client/EventSource.java
index 17d8cce..11d12ae 100644
--- a/src/main/java/com/github/eventsource/client/EventSource.java
+++ b/src/main/java/com/github/eventsource/client/EventSource.java
@@ -12,13 +12,15 @@
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import javax.net.ssl.SSLEngine;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-public class EventSource {
+public class EventSource implements EventSourceHandler {
public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000;
public static final int CONNECTING = 0;
@@ -27,7 +29,9 @@ public class EventSource {
private final ClientBootstrap bootstrap;
private final EventSourceChannelHandler clientHandler;
+ private final EventSourceHandler eventSourceHandler;
+ private URI uri;
private int readyState;
/**
@@ -44,17 +48,35 @@ public class EventSource {
* @see #close()
*/
public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
+ this(executor, reconnectionTimeMillis, uri, null, eventSourceHandler);
+ }
+
+ public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this.eventSourceHandler = eventSourceHandler;
+
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor()));
- bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
+ this.uri = uri;
+
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri)));
- clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);
+ // add this class as the event source handler so the connect() call can be intercepted
+ AsyncEventSourceHandler asyncHandler = new AsyncEventSourceHandler(executor, this);
+
+ clientHandler = new EventSourceChannelHandler(asyncHandler, reconnectionTimeMillis, bootstrap, uri);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
+
+ if (uri.getScheme().equals("https") && sslEngineProvider != null) {
+ SSLEngine engine = sslEngineProvider.createSSLEngine();
+ engine.setUseClientMode(true);
+ pipeline.addLast("ssl", new SslHandler(engine));
+ }
+
pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());
@@ -66,24 +88,51 @@ public ChannelPipeline getPipeline() throws Exception {
}
public EventSource(String uri, EventSourceHandler eventSourceHandler) {
- this(URI.create(uri), eventSourceHandler);
+ this(uri, null, eventSourceHandler);
+ }
+
+ public EventSource(String uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this(URI.create(uri), sslEngineProvider, eventSourceHandler);
}
public EventSource(URI uri, EventSourceHandler eventSourceHandler) {
- this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
+ this(uri, null, eventSourceHandler);
+ }
+
+ public EventSource(URI uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineProvider, eventSourceHandler);
+ }
+
+ /**
+ * Sets a custom HTTP header that will be used when the request is made to establish the SSE channel.
+ *
+ * @param name the HTTP header name
+ * @param value the header value
+ */
+ public void setCustomRequestHeader(String name, String value) {
+ clientHandler.setCustomRequestHeader(name, value);
}
public ChannelFuture connect() {
readyState = CONNECTING;
+
+ //To avoid perpetual "SocketUnresolvedException"
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri)));
+
return bootstrap.connect();
}
+ public boolean isConnected() {
+ return (readyState == OPEN);
+ }
+
/**
* Close the connection
*
* @return self
*/
public EventSource close() {
+ readyState = CLOSED;
clientHandler.close();
return this;
}
@@ -98,4 +147,38 @@ public EventSource join() throws InterruptedException {
clientHandler.join();
return this;
}
+
+ @Override
+ public void onConnect() throws Exception {
+ // flag the connection as open
+ readyState = OPEN;
+
+ // pass event to the proper handler
+ eventSourceHandler.onConnect();
+ }
+
+ @Override
+ public void onMessage(String event, MessageEvent message) throws Exception {
+ // pass event to the proper handler
+ eventSourceHandler.onMessage(event, message);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // pass event to the proper handler
+ eventSourceHandler.onError(t);
+ }
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ eventSourceHandler.onClosed(willReconnect);
+ }
+
+ protected int getPort(URI uri) {
+ int port = uri.getPort();
+ if (port == -1) {
+ port = (uri.getScheme().equals("https")) ? 443 : 80;
+ }
+ return port;
+ }
}
diff --git a/src/main/java/com/github/eventsource/client/EventSourceHandler.java b/src/main/java/com/github/eventsource/client/EventSourceHandler.java
index b50b3d0..2ec35bf 100644
--- a/src/main/java/com/github/eventsource/client/EventSourceHandler.java
+++ b/src/main/java/com/github/eventsource/client/EventSourceHandler.java
@@ -4,4 +4,5 @@ public interface EventSourceHandler {
void onConnect() throws Exception;
void onMessage(String event, MessageEvent message) throws Exception;
void onError(Throwable t);
+ void onClosed(boolean willReconnect);
}
diff --git a/src/main/java/com/github/eventsource/client/SSLEngineProvider.java b/src/main/java/com/github/eventsource/client/SSLEngineProvider.java
new file mode 100644
index 0000000..9f6ac28
--- /dev/null
+++ b/src/main/java/com/github/eventsource/client/SSLEngineProvider.java
@@ -0,0 +1,7 @@
+package com.github.eventsource.client;
+
+import javax.net.ssl.SSLEngine;
+
+public interface SSLEngineProvider {
+ SSLEngine createSSLEngine();
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
index 89864f2..63f1943 100644
--- a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
+++ b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
@@ -55,4 +55,18 @@ public void run() {
}
});
}
+
+ @Override
+ public void onClosed(final boolean willReconnect) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ eventSourceHandler.onClosed(willReconnect);
+ } catch (Exception e) {
+ onError(e);
+ }
+ }
+ });
+ }
}
diff --git a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
index 6ad7d7b..0a7f669 100644
--- a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
+++ b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
@@ -5,7 +5,15 @@
import com.github.eventsource.client.impl.ConnectionHandler;
import com.github.eventsource.client.impl.EventStreamParser;
import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.*;
+
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -17,7 +25,10 @@
import org.jboss.netty.util.TimerTask;
import java.net.ConnectException;
+import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
@@ -25,12 +36,13 @@
public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler implements ConnectionHandler {
private static final Pattern STATUS_PATTERN = Pattern.compile("HTTP/1.1 (\\d+) (.*)");
- private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream");
+ private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream", Pattern.CASE_INSENSITIVE);
private final EventSourceHandler eventSourceHandler;
private final ClientBootstrap bootstrap;
private final URI uri;
private final EventStreamParser messageDispatcher;
+ private final Map customRequestHeaders = new HashMap();
private final Timer timer = new HashedWheelTimer();
private Channel channel;
@@ -57,14 +69,23 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exc
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString());
+ final String query = uri.getQuery();
+ final String path = uri.getPath() + (((null != query) && !query.isEmpty()) ? "?" + query : "");
+ final int port = uri.getPort();
+ final String portPostfix = ((port != -1) ? ":" + port : "");
+ final String host = uri.getHost() + portPostfix;
+ final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
request.addHeader(Names.ACCEPT, "text/event-stream");
- request.addHeader(Names.HOST, uri.getHost());
- request.addHeader(Names.ORIGIN, "http://" + uri.getHost());
+ request.addHeader(Names.HOST, host);
+ request.addHeader(Names.ORIGIN, uri.getScheme() + "://" + host);
request.addHeader(Names.CACHE_CONTROL, "no-cache");
if (lastEventId != null) {
request.addHeader("Last-Event-ID", lastEventId);
}
+ // add any custom headers that have been set
+ for (String name : customRequestHeaders.keySet()) {
+ request.addHeader(name, customRequestHeaders.get(name));
+ }
e.getChannel().write(request);
channel = e.getChannel();
}
@@ -76,6 +97,10 @@ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ if (eventStreamOk) {
+ // call onClosed only if it was successfully opened (and onConnect was called)
+ eventSourceHandler.onClosed(reconnectOnClose);
+ }
if (reconnectOnClose) {
reconnect();
}
@@ -150,16 +175,28 @@ public EventSourceChannelHandler join() throws InterruptedException {
return this;
}
+ /**
+ * Sets a custom HTTP header that will be used when the request is made to establish the SSE channel.
+ *
+ * @param name the HTTP header name
+ * @param value the header value
+ */
+ public void setCustomRequestHeader(String name, String value) {
+ customRequestHeaders.put(name, value);
+ }
+
private void reconnect() {
- if(!reconnecting.get()) {
- reconnecting.set(true);
+ if (reconnecting.compareAndSet(false, true)) {
+ headerDone = false;
+ eventStreamOk = false;
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
reconnecting.set(false);
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
bootstrap.connect().await();
}
}, reconnectionTimeMillis, TimeUnit.MILLISECONDS);
}
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java
index 5977afb..01856db 100644
--- a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java
+++ b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java
@@ -32,6 +32,11 @@ public void onConnect() throws Exception {
connected = true;
}
+ @Override
+ public void onClosed(boolean willReconnect){
+ connected = false;
+ }
+
@Override
public void onMessage(String event, MessageEvent message) throws Exception {
getMessageEvents(event).add(message);
diff --git a/src/test/java/com/github/eventsource/client/DebugClient.java b/src/test/java/com/github/eventsource/client/DebugClient.java
index 3c11876..5ce75fb 100644
--- a/src/test/java/com/github/eventsource/client/DebugClient.java
+++ b/src/test/java/com/github/eventsource/client/DebugClient.java
@@ -21,6 +21,11 @@ public void onError(Throwable t) {
System.err.println("ERROR");
t.printStackTrace();
}
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ System.err.println("CLOSED");
+ }
});
es.connect();
diff --git a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
index 02fb54a..27c44d9 100644
--- a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
+++ b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
@@ -105,6 +105,11 @@ public void onError(Throwable t) {
System.out.println("ERROR: " + t);
errorCountdown.countDown();
}
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ System.out.println("CLOSED");
+ }
});
eventSource.connect();
@@ -147,6 +152,10 @@ public void onMessage(String event, com.github.eventsource.client.MessageEvent m
public void onError(Throwable t) {
errorCountdown.countDown();
}
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ }
});
eventSource.connect().await();
}