diff --git a/.gitignore b/.gitignore
index 20502b0..5a919a8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
target
out
*.iws
+*.iml
+*.ipr
*.swp
.idea
release.properties
diff --git a/eventsource-client.iml b/eventsource-client.iml
deleted file mode 100644
index cd369a0..0000000
--- a/eventsource-client.iml
+++ /dev/null
@@ -1,25 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/eventsource-client.ipr b/eventsource-client.ipr
deleted file mode 100644
index 95c6fbf..0000000
--- a/eventsource-client.ipr
+++ /dev/null
@@ -1,294 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- http://www.w3.org/1999/xhtml
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/pom.xml b/pom.xml
index 1f60e61..64dc0f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,17 +1,12 @@
4.0.0
- com.github.aslakhellesoy
+ com.github
eventsource-client
- ${project.artifactId}
+ Java EventSource Client
A Java EventSource Client
- http://aslakhellesoy.github.com/eventsource-java
- 0.1.2.1
+ 0.6-SNAPSHOT
jar
-
- org.sonatype.oss
- oss-parent
- 6
-
+
BSD License
@@ -19,17 +14,24 @@
repo
+
- scm:git:git://github.com/aslakhellesoy/eventsource-java.git
- scm:git:git@github.com:aslakhellesoy/eventsource-java.git
- git://github.com/aslakhellesoy/eventsource-java.git
+ scm:git:https://github.com/andll/eventsource-java.git
+ scm:git:https://github.com/andll/eventsource-java.git
+ git://github.com/andll/eventsource-java.git
-
+
+
- repository.jboss.org
- http://repository.jboss.org/nexus/content/groups/public/
+ mind.releases
+ http://nexus.mindlabs.com/content/repositories/mind.releases
-
+
+ mind.snapshots
+ http://nexus.mindlabs.com/content/repositories/mind.snapshots
+
+
+
org.jboss.netty
@@ -67,23 +69,33 @@
1.6
-
- org.apache.maven.plugins
- maven-gpg-plugin
- 1.1
-
- true
-
-
-
- sign-artifacts
- verify
-
- sign
-
-
-
-
+
+
+
+ sign
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.1
+
+ true
+
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+
+
+
diff --git a/src/main/java/com/github/eventsource/client/EventSource.java b/src/main/java/com/github/eventsource/client/EventSource.java
index 17d8cce..ffd010d 100644
--- a/src/main/java/com/github/eventsource/client/EventSource.java
+++ b/src/main/java/com/github/eventsource/client/EventSource.java
@@ -2,67 +2,35 @@
import com.github.eventsource.client.impl.AsyncEventSourceHandler;
import com.github.eventsource.client.impl.netty.EventSourceChannelHandler;
-import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.Delimiters;
-import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
-import org.jboss.netty.handler.codec.string.StringDecoder;
-import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-public class EventSource {
+public class EventSource {
public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000;
- public static final int CONNECTING = 0;
- public static final int OPEN = 1;
- public static final int CLOSED = 2;
-
- private final ClientBootstrap bootstrap;
private final EventSourceChannelHandler clientHandler;
- private int readyState;
-
/**
- * Creates a new EventSource client. The client will reconnect on
- * lost connections automatically, unless the connection is closed explicitly by a call to
+ * Creates a new EventSource client. The client will reconnect on
+ * lost connections automatically, unless the connection is closed explicitly by a call to
* {@link com.github.eventsource.client.EventSource#close()}.
*
* For sample usage, see examples at GitHub.
- *
- * @param executor the executor that will receive events
+ *
+ * @param eventSourceClient EventSourceClient to start event source at
* @param reconnectionTimeMillis delay before a reconnect is made - in the event of a lost connection
* @param uri where to connect
* @param eventSourceHandler receives events
* @see #close()
*/
- public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
- bootstrap = new ClientBootstrap(
- new NioClientSocketChannelFactory(
- Executors.newSingleThreadExecutor(),
- Executors.newSingleThreadExecutor()));
- bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
-
- clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);
-
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
- pipeline.addLast("string", new StringDecoder());
+ public EventSource(EventSourceClient eventSourceClient, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
+ clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(eventSourceClient.getEventExecutor(), eventSourceHandler), reconnectionTimeMillis, eventSourceClient, uri);
+ }
- pipeline.addLast("encoder", new HttpRequestEncoder());
- pipeline.addLast("es-handler", clientHandler);
- return pipeline;
- }
- });
+ public EventSource(Executor eventExecutor, long reconnectionTimeMillis, URI uri, EventSourceHandler eventSourceHandler) {
+ this(new EventSourceClient(eventExecutor), reconnectionTimeMillis, uri, eventSourceHandler);
}
public EventSource(String uri, EventSourceHandler eventSourceHandler) {
@@ -70,12 +38,11 @@ public EventSource(String uri, EventSourceHandler eventSourceHandler) {
}
public EventSource(URI uri, EventSourceHandler eventSourceHandler) {
- this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
+ this(new EventSourceClient(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
}
public ChannelFuture connect() {
- readyState = CONNECTING;
- return bootstrap.connect();
+ return clientHandler.connect();
}
/**
diff --git a/src/main/java/com/github/eventsource/client/EventSourceClient.java b/src/main/java/com/github/eventsource/client/EventSourceClient.java
new file mode 100644
index 0000000..dc30d25
--- /dev/null
+++ b/src/main/java/com/github/eventsource/client/EventSourceClient.java
@@ -0,0 +1,91 @@
+package com.github.eventsource.client;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class EventSourceClient {
+ private final ClientBootstrap bootstrap;
+ private final Executor eventExecutor;
+
+ private final HashMap handlerMap = new HashMap();
+
+ public EventSourceClient() {
+ this(Executors.newSingleThreadExecutor());
+ }
+
+ public EventSourceClient(Executor eventExecutor) {
+ this.eventExecutor = eventExecutor;
+ bootstrap = new ClientBootstrap(
+ new NioClientSocketChannelFactory(
+ Executors.newSingleThreadExecutor(),
+ Executors.newCachedThreadPool()));
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
+ pipeline.addLast("string", new StringDecoder());
+
+ pipeline.addLast("encoder", new HttpRequestEncoder());
+ pipeline.addLast("es-handler", new Handler());
+ return pipeline;
+ }
+ });
+ }
+
+ public ChannelFuture connect(InetSocketAddress address, ChannelUpstreamHandler handler) {
+ synchronized (handlerMap) {
+ ChannelFuture channelFuture = bootstrap.connect(address);
+ handlerMap.put(channelFuture.getChannel(), handler);
+ return channelFuture;
+ }
+ }
+
+ private class Handler extends SimpleChannelUpstreamHandler {
+ @Override
+ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
+ final ChannelUpstreamHandler handler;
+ synchronized (handlerMap) {
+ handler = handlerMap.get(ctx.getChannel());
+ }
+ if (handler == null) {
+ super.handleUpstream(ctx, e);
+
+ if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() == ChannelState.OPEN) {
+ return; //Do nothing, this one will not be dispatched to handler, but it's ok
+ }
+
+ System.err.println("Something wrong with dispatching");
+ } else {
+ handler.handleUpstream(ctx, e);
+
+ if (e instanceof ChannelStateEvent) {
+ ChannelStateEvent stateEvent = (ChannelStateEvent) e;
+ if (stateEvent.getState() == ChannelState.BOUND && stateEvent.getValue() == null) {
+ synchronized (handlerMap) {
+ handlerMap.remove(ctx.getChannel());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public Executor getEventExecutor() {
+ return eventExecutor;
+ }
+
+ public void shutdown() {
+ bootstrap.releaseExternalResources();
+ }
+}
diff --git a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
index 6ad7d7b..f301a5a 100644
--- a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
+++ b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
@@ -1,10 +1,10 @@
package com.github.eventsource.client.impl.netty;
+import com.github.eventsource.client.EventSourceClient;
import com.github.eventsource.client.EventSourceException;
import com.github.eventsource.client.EventSourceHandler;
import com.github.eventsource.client.impl.ConnectionHandler;
import com.github.eventsource.client.impl.EventStreamParser;
-import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
@@ -17,6 +17,7 @@
import org.jboss.netty.util.TimerTask;
import java.net.ConnectException;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,14 +26,14 @@
public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler implements ConnectionHandler {
private static final Pattern STATUS_PATTERN = Pattern.compile("HTTP/1.1 (\\d+) (.*)");
- private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream");
+ private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream;?.*");
private final EventSourceHandler eventSourceHandler;
- private final ClientBootstrap bootstrap;
+ private final EventSourceClient client;
private final URI uri;
private final EventStreamParser messageDispatcher;
- private final Timer timer = new HashedWheelTimer();
+ private static final Timer TIMER = new HashedWheelTimer();
private Channel channel;
private boolean reconnectOnClose = true;
private long reconnectionTimeMillis;
@@ -42,10 +43,10 @@ public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler impl
private Integer status;
private AtomicBoolean reconnecting = new AtomicBoolean(false);
- public EventSourceChannelHandler(EventSourceHandler eventSourceHandler, long reconnectionTimeMillis, ClientBootstrap bootstrap, URI uri) {
+ public EventSourceChannelHandler(EventSourceHandler eventSourceHandler, long reconnectionTimeMillis, EventSourceClient client, URI uri) {
this.eventSourceHandler = eventSourceHandler;
this.reconnectionTimeMillis = reconnectionTimeMillis;
- this.bootstrap = bootstrap;
+ this.client = client;
this.uri = uri;
this.messageDispatcher = new EventStreamParser(uri.toString(), eventSourceHandler, this);
}
@@ -143,6 +144,10 @@ public EventSourceChannelHandler close() {
return this;
}
+ public ChannelFuture connect() {
+ return client.connect(getConnectAddress(), this);
+ }
+
public EventSourceChannelHandler join() throws InterruptedException {
if (channel != null) {
channel.getCloseFuture().await();
@@ -151,15 +156,23 @@ public EventSourceChannelHandler join() throws InterruptedException {
}
private void reconnect() {
- if(!reconnecting.get()) {
- reconnecting.set(true);
- timer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- reconnecting.set(false);
- bootstrap.connect().await();
- }
- }, reconnectionTimeMillis, TimeUnit.MILLISECONDS);
+ if(reconnectionTimeMillis >= 0) {
+ if(!reconnecting.get()) {
+ reconnecting.set(true);
+ TIMER.newTimeout(new TimerTask()
+ {
+ @Override
+ public void run(Timeout timeout) throws Exception
+ {
+ reconnecting.set(false);
+ connect().await();
+ }
+ }, reconnectionTimeMillis, TimeUnit.MILLISECONDS);
+ }
}
}
+
+ public InetSocketAddress getConnectAddress() {
+ return new InetSocketAddress(uri.getHost(), uri.getPort() == -1 ? 80 : uri.getPort());
+ }
}
\ No newline at end of file
diff --git a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
index 02fb54a..1203795 100644
--- a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
+++ b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
@@ -12,7 +12,6 @@
import java.net.URI;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
@@ -84,7 +83,7 @@ public void onClose(EventSourceConnection connection) throws Exception {
webServer.start();
- eventSource = new EventSource(Executors.newSingleThreadExecutor(), 100, URI.create("http://localhost:59504/es/hello"), new EventSourceHandler() {
+ eventSource = new EventSource(new EventSourceClient(), 100, URI.create("http://localhost:59504/es/hello"), new EventSourceHandler() {
@Override
public void onConnect() {
}
@@ -129,7 +128,7 @@ private void assertSentAndReceived(final List messages) throws IOExcepti
}
private void startClient(final List expectedMessages, final CountDownLatch messageCountdown, final CountDownLatch errorCountdown, long reconnectionTimeMillis) throws InterruptedException {
- eventSource = new EventSource(Executors.newSingleThreadExecutor(), reconnectionTimeMillis, URI.create("http://localhost:59504/es/hello?echoThis=yo"), new EventSourceHandler() {
+ eventSource = new EventSource(new EventSourceClient(), reconnectionTimeMillis, URI.create("http://localhost:59504/es/hello?echoThis=yo"), new EventSourceHandler() {
int n = 0;
@Override