diff --git a/src/main/java/com/github/eventsource/client/EventSource.java b/src/main/java/com/github/eventsource/client/EventSource.java index 17d8cce..e8617d9 100644 --- a/src/main/java/com/github/eventsource/client/EventSource.java +++ b/src/main/java/com/github/eventsource/client/EventSource.java @@ -2,6 +2,7 @@ import com.github.eventsource.client.impl.AsyncEventSourceHandler; import com.github.eventsource.client.impl.netty.EventSourceChannelHandler; + import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; @@ -12,13 +13,19 @@ import org.jboss.netty.handler.codec.frame.Delimiters; import org.jboss.netty.handler.codec.http.HttpRequestEncoder; import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.ssl.SslHandler; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import java.net.InetSocketAddress; import java.net.URI; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -public class EventSource { +public class EventSource implements EventSourceHandler { public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000; public static final int CONNECTING = 0; @@ -27,7 +34,9 @@ public class EventSource { private final ClientBootstrap bootstrap; private final EventSourceChannelHandler clientHandler; + private final EventSourceHandler eventSourceHandler; + private URI uri; private int readyState; /** @@ -43,18 +52,49 @@ public class EventSource { * @param eventSourceHandler receives events * @see #close() */ - public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) { + public EventSource(Executor executor, long reconnectionTimeMillis, final URI pURI, EventSourceHandler eventSourceHandler) { + this(executor, reconnectionTimeMillis, pURI, null, eventSourceHandler); + } + + public EventSource(Executor executor, long reconnectionTimeMillis, final URI pURI, SSLEngineFactory fSSLEngine, EventSourceHandler eventSourceHandler) { + this.eventSourceHandler = eventSourceHandler; + + bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor())); - bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort())); + if (pURI.getScheme().equals("https") && fSSLEngine == null) { + fSSLEngine = new SSLEngineFactory(); + } else { + //If we don't do this then the pipeline still attempts to use SSL + fSSLEngine = null; + } + final SSLEngineFactory SSLFactory = fSSLEngine; + + uri = pURI; + int port = uri.getPort(); + if (port==-1) + { + port = (uri.getScheme().equals("https"))?443:80; + } + bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), port)); + + // add this class as the event source handler so the connect() call can be intercepted + AsyncEventSourceHandler asyncHandler = new AsyncEventSourceHandler(executor, this); - clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri); + clientHandler = new EventSourceChannelHandler(asyncHandler, reconnectionTimeMillis, bootstrap, uri); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); + + if (SSLFactory != null) { + SSLEngine sslEngine = SSLFactory.GetNewSSLEngine(); + sslEngine.setUseClientMode(true); + pipeline.addLast("ssl", new SslHandler(sslEngine)); + } + pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter())); pipeline.addLast("string", new StringDecoder()); @@ -66,24 +106,45 @@ public ChannelPipeline getPipeline() throws Exception { } public EventSource(String uri, EventSourceHandler eventSourceHandler) { - this(URI.create(uri), eventSourceHandler); + this(uri, null, eventSourceHandler); + } + + public EventSource(String uri, SSLEngineFactory sslEngineFactory, EventSourceHandler eventSourceHandler) { + this(URI.create(uri), sslEngineFactory, eventSourceHandler); } public EventSource(URI uri, EventSourceHandler eventSourceHandler) { - this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler); + this(uri, null, eventSourceHandler); + } + + public EventSource(URI uri, SSLEngineFactory sslEngineFactory, EventSourceHandler eventSourceHandler) { + this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineFactory, eventSourceHandler); } public ChannelFuture connect() { readyState = CONNECTING; + + //To avoid perpetual "SocketUnresolvedException" + int port = uri.getPort(); + if (port==-1) + { + port = (uri.getScheme().equals("https"))?443:80; + } + bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), port)); return bootstrap.connect(); } + public boolean isConnected() { + return (readyState == OPEN); + } + /** * Close the connection * * @return self */ public EventSource close() { + readyState = CLOSED; clientHandler.close(); return this; } @@ -98,4 +159,31 @@ public EventSource join() throws InterruptedException { clientHandler.join(); return this; } + + @Override + public void onConnect() throws Exception { + // flag the connection as open + readyState = OPEN; + + // pass event to the proper handler + eventSourceHandler.onConnect(); + } + + @Override + public void onMessage(String event, MessageEvent message) throws Exception { + // pass event to the proper handler + eventSourceHandler.onMessage(event, message); + } + + @Override + public void onError(Throwable t) { + // pass event to the proper handler + eventSourceHandler.onError(t); + } + + @Override + public void onClosed(boolean willReconnect) { + // pass event to the proper handler + eventSourceHandler.onClosed(willReconnect); + } } diff --git a/src/main/java/com/github/eventsource/client/EventSourceHandler.java b/src/main/java/com/github/eventsource/client/EventSourceHandler.java index b50b3d0..2ec35bf 100644 --- a/src/main/java/com/github/eventsource/client/EventSourceHandler.java +++ b/src/main/java/com/github/eventsource/client/EventSourceHandler.java @@ -4,4 +4,5 @@ public interface EventSourceHandler { void onConnect() throws Exception; void onMessage(String event, MessageEvent message) throws Exception; void onError(Throwable t); + void onClosed(boolean willReconnect); } diff --git a/src/main/java/com/github/eventsource/client/SSLEngineFactory.java b/src/main/java/com/github/eventsource/client/SSLEngineFactory.java new file mode 100644 index 0000000..22da527 --- /dev/null +++ b/src/main/java/com/github/eventsource/client/SSLEngineFactory.java @@ -0,0 +1,26 @@ +package com.github.eventsource.client; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +public class SSLEngineFactory { + SSLEngine GetNewSSLEngine() { + SSLEngine sslEngine = null; + SSLContext sslContext; + try { + sslContext = SSLContext.getInstance("TLS"); + try { + sslContext.init(null, null, null); + sslEngine = sslContext.createSSLEngine(); + } catch (KeyManagementException e) { + return null; + } + } catch (NoSuchAlgorithmException e1) { + return null; + } + return sslEngine; + } +} diff --git a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java index 89864f2..dd13c0c 100644 --- a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java +++ b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java @@ -27,6 +27,21 @@ public void run() { } }); } + + @Override + public void onClosed(final boolean willReconnect) + { + executor.execute(new Runnable() { + @Override + public void run() { + try { + eventSourceHandler.onClosed(willReconnect); + } catch (Exception e) { + onError(e); + } + } + }); + } @Override public void onMessage(final String event, final MessageEvent message) { diff --git a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java index 6ad7d7b..c7ecdc3 100644 --- a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java +++ b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java @@ -17,6 +17,7 @@ import org.jboss.netty.util.TimerTask; import java.net.ConnectException; +import java.net.InetSocketAddress; import java.net.URI; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -60,7 +61,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString()); request.addHeader(Names.ACCEPT, "text/event-stream"); request.addHeader(Names.HOST, uri.getHost()); - request.addHeader(Names.ORIGIN, "http://" + uri.getHost()); + request.addHeader(Names.ORIGIN, uri.getScheme()+"://" + uri.getHost()); request.addHeader(Names.CACHE_CONTROL, "no-cache"); if (lastEventId != null) { request.addHeader("Last-Event-ID", lastEventId); @@ -76,6 +77,7 @@ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + eventSourceHandler.onClosed(reconnectOnClose); if (reconnectOnClose) { reconnect(); } @@ -157,6 +159,11 @@ private void reconnect() { @Override public void run(Timeout timeout) throws Exception { reconnecting.set(false); + int port = uri.getPort(); + if (port==-1) { + port = (uri.getScheme().equals("https"))?443:80; + } + bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), port)); bootstrap.connect().await(); } }, reconnectionTimeMillis, TimeUnit.MILLISECONDS); diff --git a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java index 5977afb..03e6f4c 100644 --- a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java +++ b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java @@ -31,6 +31,11 @@ public void setLastEventId(String lastEventId) { public void onConnect() throws Exception { connected = true; } + + @Override + public void onClosed(boolean willReconnect){ + connected = false; + } @Override public void onMessage(String event, MessageEvent message) throws Exception {