Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 94 additions & 6 deletions src/main/java/com/github/eventsource/client/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.eventsource.client.impl.AsyncEventSourceHandler;
import com.github.eventsource.client.impl.netty.EventSourceChannelHandler;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
Expand All @@ -12,13 +13,19 @@
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import java.net.InetSocketAddress;
import java.net.URI;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class EventSource {
public class EventSource implements EventSourceHandler {
public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000;

public static final int CONNECTING = 0;
Expand All @@ -27,7 +34,9 @@ public class EventSource {

private final ClientBootstrap bootstrap;
private final EventSourceChannelHandler clientHandler;
private final EventSourceHandler eventSourceHandler;

private URI uri;
private int readyState;

/**
Expand All @@ -43,18 +52,49 @@ public class EventSource {
* @param eventSourceHandler receives events
* @see #close()
*/
public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
public EventSource(Executor executor, long reconnectionTimeMillis, final URI pURI, EventSourceHandler eventSourceHandler) {
this(executor, reconnectionTimeMillis, pURI, null, eventSourceHandler);
}

public EventSource(Executor executor, long reconnectionTimeMillis, final URI pURI, SSLEngineFactory fSSLEngine, EventSourceHandler eventSourceHandler) {
this.eventSourceHandler = eventSourceHandler;


bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor()));
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
if (pURI.getScheme().equals("https") && fSSLEngine == null) {
fSSLEngine = new SSLEngineFactory();
} else {
//If we don't do this then the pipeline still attempts to use SSL
fSSLEngine = null;
}
final SSLEngineFactory SSLFactory = fSSLEngine;

uri = pURI;
int port = uri.getPort();
if (port==-1)
{
port = (uri.getScheme().equals("https"))?443:80;
}
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), port));

// add this class as the event source handler so the connect() call can be intercepted
AsyncEventSourceHandler asyncHandler = new AsyncEventSourceHandler(executor, this);

clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);
clientHandler = new EventSourceChannelHandler(asyncHandler, reconnectionTimeMillis, bootstrap, uri);

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

if (SSLFactory != null) {
SSLEngine sslEngine = SSLFactory.GetNewSSLEngine();
sslEngine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(sslEngine));
}

pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());

Expand All @@ -66,24 +106,45 @@ public ChannelPipeline getPipeline() throws Exception {
}

public EventSource(String uri, EventSourceHandler eventSourceHandler) {
this(URI.create(uri), eventSourceHandler);
this(uri, null, eventSourceHandler);
}

public EventSource(String uri, SSLEngineFactory sslEngineFactory, EventSourceHandler eventSourceHandler) {
this(URI.create(uri), sslEngineFactory, eventSourceHandler);
}

public EventSource(URI uri, EventSourceHandler eventSourceHandler) {
this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
this(uri, null, eventSourceHandler);
}

public EventSource(URI uri, SSLEngineFactory sslEngineFactory, EventSourceHandler eventSourceHandler) {
this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineFactory, eventSourceHandler);
}

public ChannelFuture connect() {
readyState = CONNECTING;

//To avoid perpetual "SocketUnresolvedException"
int port = uri.getPort();
if (port==-1)
{
port = (uri.getScheme().equals("https"))?443:80;
}
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), port));
return bootstrap.connect();
}

public boolean isConnected() {
return (readyState == OPEN);
}

/**
* Close the connection
*
* @return self
*/
public EventSource close() {
readyState = CLOSED;
clientHandler.close();
return this;
}
Expand All @@ -98,4 +159,31 @@ public EventSource join() throws InterruptedException {
clientHandler.join();
return this;
}

@Override
public void onConnect() throws Exception {
// flag the connection as open
readyState = OPEN;

// pass event to the proper handler
eventSourceHandler.onConnect();
}

@Override
public void onMessage(String event, MessageEvent message) throws Exception {
// pass event to the proper handler
eventSourceHandler.onMessage(event, message);
}

@Override
public void onError(Throwable t) {
// pass event to the proper handler
eventSourceHandler.onError(t);
}

@Override
public void onClosed(boolean willReconnect) {
// pass event to the proper handler
eventSourceHandler.onClosed(willReconnect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ public interface EventSourceHandler {
void onConnect() throws Exception;
void onMessage(String event, MessageEvent message) throws Exception;
void onError(Throwable t);
void onClosed(boolean willReconnect);
}
26 changes: 26 additions & 0 deletions src/main/java/com/github/eventsource/client/SSLEngineFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.eventsource.client;

import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

public class SSLEngineFactory {
SSLEngine GetNewSSLEngine() {
SSLEngine sslEngine = null;
SSLContext sslContext;
try {
sslContext = SSLContext.getInstance("TLS");
try {
sslContext.init(null, null, null);
sslEngine = sslContext.createSSLEngine();
} catch (KeyManagementException e) {
return null;
}
} catch (NoSuchAlgorithmException e1) {
return null;
}
return sslEngine;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ public void run() {
}
});
}

@Override
public void onClosed(final boolean willReconnect)
{
executor.execute(new Runnable() {
@Override
public void run() {
try {
eventSourceHandler.onClosed(willReconnect);
} catch (Exception e) {
onError(e);
}
}
});
}

@Override
public void onMessage(final String event, final MessageEvent message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.jboss.netty.util.TimerTask;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString());
request.addHeader(Names.ACCEPT, "text/event-stream");
request.addHeader(Names.HOST, uri.getHost());
request.addHeader(Names.ORIGIN, "http://" + uri.getHost());
request.addHeader(Names.ORIGIN, uri.getScheme()+"://" + uri.getHost());
request.addHeader(Names.CACHE_CONTROL, "no-cache");
if (lastEventId != null) {
request.addHeader("Last-Event-ID", lastEventId);
Expand All @@ -76,6 +77,7 @@ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)

@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
eventSourceHandler.onClosed(reconnectOnClose);
if (reconnectOnClose) {
reconnect();
}
Expand Down Expand Up @@ -157,6 +159,11 @@ private void reconnect() {
@Override
public void run(Timeout timeout) throws Exception {
reconnecting.set(false);
int port = uri.getPort();
if (port==-1) {
port = (uri.getScheme().equals("https"))?443:80;
}
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), port));
bootstrap.connect().await();
}
}, reconnectionTimeMillis, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public void setLastEventId(String lastEventId) {
public void onConnect() throws Exception {
connected = true;
}

@Override
public void onClosed(boolean willReconnect){
connected = false;
}

@Override
public void onMessage(String event, MessageEvent message) throws Exception {
Expand Down