diff --git a/.gitignore b/.gitignore index 20502b0..5a919a8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ target out *.iws +*.iml +*.ipr *.swp .idea release.properties diff --git a/eventsource-client.iml b/eventsource-client.iml deleted file mode 100644 index cd369a0..0000000 --- a/eventsource-client.iml +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/eventsource-client.ipr b/eventsource-client.ipr deleted file mode 100644 index 95c6fbf..0000000 --- a/eventsource-client.ipr +++ /dev/null @@ -1,294 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - http://www.w3.org/1999/xhtml - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/pom.xml b/pom.xml index 1f60e61..64dc0f8 100644 --- a/pom.xml +++ b/pom.xml @@ -1,17 +1,12 @@ 4.0.0 - com.github.aslakhellesoy + com.github eventsource-client - ${project.artifactId} + Java EventSource Client A Java EventSource Client - http://aslakhellesoy.github.com/eventsource-java - 0.1.2.1 + 0.6-SNAPSHOT jar - - org.sonatype.oss - oss-parent - 6 - + BSD License @@ -19,17 +14,24 @@ repo + - scm:git:git://github.com/aslakhellesoy/eventsource-java.git - scm:git:git@github.com:aslakhellesoy/eventsource-java.git - git://github.com/aslakhellesoy/eventsource-java.git + scm:git:https://github.com/andll/eventsource-java.git + scm:git:https://github.com/andll/eventsource-java.git + git://github.com/andll/eventsource-java.git - + + - repository.jboss.org - http://repository.jboss.org/nexus/content/groups/public/ + mind.releases + http://nexus.mindlabs.com/content/repositories/mind.releases - + + mind.snapshots + http://nexus.mindlabs.com/content/repositories/mind.snapshots + + + org.jboss.netty @@ -67,23 +69,33 @@ 1.6 - - org.apache.maven.plugins - maven-gpg-plugin - 1.1 - - true - - - - sign-artifacts - verify - - sign - - - - + + + + sign + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.1 + + true + + + + sign-artifacts + verify + + sign + + + + + + + + diff --git a/src/main/java/com/github/eventsource/client/EventSource.java b/src/main/java/com/github/eventsource/client/EventSource.java index 17d8cce..ffd010d 100644 --- a/src/main/java/com/github/eventsource/client/EventSource.java +++ b/src/main/java/com/github/eventsource/client/EventSource.java @@ -2,67 +2,35 @@ import com.github.eventsource.client.impl.AsyncEventSourceHandler; import com.github.eventsource.client.impl.netty.EventSourceChannelHandler; -import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.Delimiters; -import org.jboss.netty.handler.codec.http.HttpRequestEncoder; -import org.jboss.netty.handler.codec.string.StringDecoder; -import java.net.InetSocketAddress; import java.net.URI; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -public class EventSource { +public class EventSource { public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000; - public static final int CONNECTING = 0; - public static final int OPEN = 1; - public static final int CLOSED = 2; - - private final ClientBootstrap bootstrap; private final EventSourceChannelHandler clientHandler; - private int readyState; - /** - * Creates a new EventSource client. The client will reconnect on - * lost connections automatically, unless the connection is closed explicitly by a call to + * Creates a new EventSource client. The client will reconnect on + * lost connections automatically, unless the connection is closed explicitly by a call to * {@link com.github.eventsource.client.EventSource#close()}. * * For sample usage, see examples at GitHub. - * - * @param executor the executor that will receive events + * + * @param eventSourceClient EventSourceClient to start event source at * @param reconnectionTimeMillis delay before a reconnect is made - in the event of a lost connection * @param uri where to connect * @param eventSourceHandler receives events * @see #close() */ - public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) { - bootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor())); - bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort())); - - clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri); - - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter())); - pipeline.addLast("string", new StringDecoder()); + public EventSource(EventSourceClient eventSourceClient, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) { + clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(eventSourceClient.getEventExecutor(), eventSourceHandler), reconnectionTimeMillis, eventSourceClient, uri); + } - pipeline.addLast("encoder", new HttpRequestEncoder()); - pipeline.addLast("es-handler", clientHandler); - return pipeline; - } - }); + public EventSource(Executor eventExecutor, long reconnectionTimeMillis, URI uri, EventSourceHandler eventSourceHandler) { + this(new EventSourceClient(eventExecutor), reconnectionTimeMillis, uri, eventSourceHandler); } public EventSource(String uri, EventSourceHandler eventSourceHandler) { @@ -70,12 +38,11 @@ public EventSource(String uri, EventSourceHandler eventSourceHandler) { } public EventSource(URI uri, EventSourceHandler eventSourceHandler) { - this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler); + this(new EventSourceClient(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler); } public ChannelFuture connect() { - readyState = CONNECTING; - return bootstrap.connect(); + return clientHandler.connect(); } /** diff --git a/src/main/java/com/github/eventsource/client/EventSourceClient.java b/src/main/java/com/github/eventsource/client/EventSourceClient.java new file mode 100644 index 0000000..dc30d25 --- /dev/null +++ b/src/main/java/com/github/eventsource/client/EventSourceClient.java @@ -0,0 +1,91 @@ +package com.github.eventsource.client; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.handler.codec.http.HttpRequestEncoder; +import org.jboss.netty.handler.codec.string.StringDecoder; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class EventSourceClient { + private final ClientBootstrap bootstrap; + private final Executor eventExecutor; + + private final HashMap handlerMap = new HashMap(); + + public EventSourceClient() { + this(Executors.newSingleThreadExecutor()); + } + + public EventSourceClient(Executor eventExecutor) { + this.eventExecutor = eventExecutor; + bootstrap = new ClientBootstrap( + new NioClientSocketChannelFactory( + Executors.newSingleThreadExecutor(), + Executors.newCachedThreadPool())); + + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter())); + pipeline.addLast("string", new StringDecoder()); + + pipeline.addLast("encoder", new HttpRequestEncoder()); + pipeline.addLast("es-handler", new Handler()); + return pipeline; + } + }); + } + + public ChannelFuture connect(InetSocketAddress address, ChannelUpstreamHandler handler) { + synchronized (handlerMap) { + ChannelFuture channelFuture = bootstrap.connect(address); + handlerMap.put(channelFuture.getChannel(), handler); + return channelFuture; + } + } + + private class Handler extends SimpleChannelUpstreamHandler { + @Override + public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { + final ChannelUpstreamHandler handler; + synchronized (handlerMap) { + handler = handlerMap.get(ctx.getChannel()); + } + if (handler == null) { + super.handleUpstream(ctx, e); + + if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() == ChannelState.OPEN) { + return; //Do nothing, this one will not be dispatched to handler, but it's ok + } + + System.err.println("Something wrong with dispatching"); + } else { + handler.handleUpstream(ctx, e); + + if (e instanceof ChannelStateEvent) { + ChannelStateEvent stateEvent = (ChannelStateEvent) e; + if (stateEvent.getState() == ChannelState.BOUND && stateEvent.getValue() == null) { + synchronized (handlerMap) { + handlerMap.remove(ctx.getChannel()); + } + } + } + } + } + } + + public Executor getEventExecutor() { + return eventExecutor; + } + + public void shutdown() { + bootstrap.releaseExternalResources(); + } +} diff --git a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java index 6ad7d7b..f301a5a 100644 --- a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java +++ b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java @@ -1,10 +1,10 @@ package com.github.eventsource.client.impl.netty; +import com.github.eventsource.client.EventSourceClient; import com.github.eventsource.client.EventSourceException; import com.github.eventsource.client.EventSourceHandler; import com.github.eventsource.client.impl.ConnectionHandler; import com.github.eventsource.client.impl.EventStreamParser; -import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; import org.jboss.netty.handler.codec.http.DefaultHttpRequest; import org.jboss.netty.handler.codec.http.HttpHeaders.Names; @@ -17,6 +17,7 @@ import org.jboss.netty.util.TimerTask; import java.net.ConnectException; +import java.net.InetSocketAddress; import java.net.URI; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,14 +26,14 @@ public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler implements ConnectionHandler { private static final Pattern STATUS_PATTERN = Pattern.compile("HTTP/1.1 (\\d+) (.*)"); - private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream"); + private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream;?.*"); private final EventSourceHandler eventSourceHandler; - private final ClientBootstrap bootstrap; + private final EventSourceClient client; private final URI uri; private final EventStreamParser messageDispatcher; - private final Timer timer = new HashedWheelTimer(); + private static final Timer TIMER = new HashedWheelTimer(); private Channel channel; private boolean reconnectOnClose = true; private long reconnectionTimeMillis; @@ -42,10 +43,10 @@ public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler impl private Integer status; private AtomicBoolean reconnecting = new AtomicBoolean(false); - public EventSourceChannelHandler(EventSourceHandler eventSourceHandler, long reconnectionTimeMillis, ClientBootstrap bootstrap, URI uri) { + public EventSourceChannelHandler(EventSourceHandler eventSourceHandler, long reconnectionTimeMillis, EventSourceClient client, URI uri) { this.eventSourceHandler = eventSourceHandler; this.reconnectionTimeMillis = reconnectionTimeMillis; - this.bootstrap = bootstrap; + this.client = client; this.uri = uri; this.messageDispatcher = new EventStreamParser(uri.toString(), eventSourceHandler, this); } @@ -143,6 +144,10 @@ public EventSourceChannelHandler close() { return this; } + public ChannelFuture connect() { + return client.connect(getConnectAddress(), this); + } + public EventSourceChannelHandler join() throws InterruptedException { if (channel != null) { channel.getCloseFuture().await(); @@ -151,15 +156,23 @@ public EventSourceChannelHandler join() throws InterruptedException { } private void reconnect() { - if(!reconnecting.get()) { - reconnecting.set(true); - timer.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - reconnecting.set(false); - bootstrap.connect().await(); - } - }, reconnectionTimeMillis, TimeUnit.MILLISECONDS); + if(reconnectionTimeMillis >= 0) { + if(!reconnecting.get()) { + reconnecting.set(true); + TIMER.newTimeout(new TimerTask() + { + @Override + public void run(Timeout timeout) throws Exception + { + reconnecting.set(false); + connect().await(); + } + }, reconnectionTimeMillis, TimeUnit.MILLISECONDS); + } } } + + public InetSocketAddress getConnectAddress() { + return new InetSocketAddress(uri.getHost(), uri.getPort() == -1 ? 80 : uri.getPort()); + } } \ No newline at end of file diff --git a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java index 02fb54a..1203795 100644 --- a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java +++ b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java @@ -12,7 +12,6 @@ import java.net.URI; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; @@ -84,7 +83,7 @@ public void onClose(EventSourceConnection connection) throws Exception { webServer.start(); - eventSource = new EventSource(Executors.newSingleThreadExecutor(), 100, URI.create("http://localhost:59504/es/hello"), new EventSourceHandler() { + eventSource = new EventSource(new EventSourceClient(), 100, URI.create("http://localhost:59504/es/hello"), new EventSourceHandler() { @Override public void onConnect() { } @@ -129,7 +128,7 @@ private void assertSentAndReceived(final List messages) throws IOExcepti } private void startClient(final List expectedMessages, final CountDownLatch messageCountdown, final CountDownLatch errorCountdown, long reconnectionTimeMillis) throws InterruptedException { - eventSource = new EventSource(Executors.newSingleThreadExecutor(), reconnectionTimeMillis, URI.create("http://localhost:59504/es/hello?echoThis=yo"), new EventSourceHandler() { + eventSource = new EventSource(new EventSourceClient(), reconnectionTimeMillis, URI.create("http://localhost:59504/es/hello?echoThis=yo"), new EventSourceHandler() { int n = 0; @Override