Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 21 additions & 36 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,36 +1,26 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.aslakhellesoy</groupId>
<groupId>com.whizzosoftware</groupId>
<artifactId>eventsource-client</artifactId>
<name>${project.artifactId}</name>
<description>A Java EventSource Client</description>
<url>http://aslakhellesoy.github.com/eventsource-java</url>
<version>0.1.2.1</version>
<url>http://whizzosoftware.github.com/eventsource-java</url>
<version>0.1.3-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
<version>6</version>
</parent>
<licenses>
<license>
<name>BSD License</name>
<url>http://www.opensource.org/licenses/bsd-license</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<connection>scm:git:git://github.com/aslakhellesoy/eventsource-java.git</connection>
<developerConnection>scm:git:git@github.com:aslakhellesoy/eventsource-java.git</developerConnection>
<url>git://github.com/aslakhellesoy/eventsource-java.git</url>
</scm>
<repositories>
<repository>
<id>repository.jboss.org</id>
<url>http://repository.jboss.org/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
Expand Down Expand Up @@ -67,23 +57,18 @@
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.1</version>
<configuration>
<useAgent>true</useAgent>
</configuration>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>nx-releases</id>
<name>Nexus Releases</name>
<url>${nexus.releases.url}</url>
</repository>
<snapshotRepository>
<id>nx-snapshots</id>
<name>Nexus Snapshots</name>
<url>${nexus.snapshots.url}</url>
</snapshotRepository>
</distributionManagement>
</project>
93 changes: 88 additions & 5 deletions src/main/java/com/github/eventsource/client/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;

import javax.net.ssl.SSLEngine;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class EventSource {
public class EventSource implements EventSourceHandler {
public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000;

public static final int CONNECTING = 0;
Expand All @@ -27,7 +29,9 @@ public class EventSource {

private final ClientBootstrap bootstrap;
private final EventSourceChannelHandler clientHandler;
private final EventSourceHandler eventSourceHandler;

private URI uri;
private int readyState;

/**
Expand All @@ -44,17 +48,35 @@ public class EventSource {
* @see #close()
*/
public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
this(executor, reconnectionTimeMillis, uri, null, eventSourceHandler);
}

public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
this.eventSourceHandler = eventSourceHandler;

bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor()));
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
this.uri = uri;

bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri)));

clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);
// add this class as the event source handler so the connect() call can be intercepted
AsyncEventSourceHandler asyncHandler = new AsyncEventSourceHandler(executor, this);

clientHandler = new EventSourceChannelHandler(asyncHandler, reconnectionTimeMillis, bootstrap, uri);

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

if (uri.getScheme().equals("https") && sslEngineProvider != null) {
SSLEngine engine = sslEngineProvider.createSSLEngine();
engine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(engine));
}

pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());

Expand All @@ -66,24 +88,51 @@ public ChannelPipeline getPipeline() throws Exception {
}

public EventSource(String uri, EventSourceHandler eventSourceHandler) {
this(URI.create(uri), eventSourceHandler);
this(uri, null, eventSourceHandler);
}

public EventSource(String uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
this(URI.create(uri), sslEngineProvider, eventSourceHandler);
}

public EventSource(URI uri, EventSourceHandler eventSourceHandler) {
this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
this(uri, null, eventSourceHandler);
}

public EventSource(URI uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineProvider, eventSourceHandler);
}

/**
* Sets a custom HTTP header that will be used when the request is made to establish the SSE channel.
*
* @param name the HTTP header name
* @param value the header value
*/
public void setCustomRequestHeader(String name, String value) {
clientHandler.setCustomRequestHeader(name, value);
}

public ChannelFuture connect() {
readyState = CONNECTING;

//To avoid perpetual "SocketUnresolvedException"
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri)));

return bootstrap.connect();
}

public boolean isConnected() {
return (readyState == OPEN);
}

/**
* Close the connection
*
* @return self
*/
public EventSource close() {
readyState = CLOSED;
clientHandler.close();
return this;
}
Expand All @@ -98,4 +147,38 @@ public EventSource join() throws InterruptedException {
clientHandler.join();
return this;
}

@Override
public void onConnect() throws Exception {
// flag the connection as open
readyState = OPEN;

// pass event to the proper handler
eventSourceHandler.onConnect();
}

@Override
public void onMessage(String event, MessageEvent message) throws Exception {
// pass event to the proper handler
eventSourceHandler.onMessage(event, message);
}

@Override
public void onError(Throwable t) {
// pass event to the proper handler
eventSourceHandler.onError(t);
}

@Override
public void onClosed(boolean willReconnect) {
eventSourceHandler.onClosed(willReconnect);
}

protected int getPort(URI uri) {
int port = uri.getPort();
if (port == -1) {
port = (uri.getScheme().equals("https")) ? 443 : 80;
}
return port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ public interface EventSourceHandler {
void onConnect() throws Exception;
void onMessage(String event, MessageEvent message) throws Exception;
void onError(Throwable t);
void onClosed(boolean willReconnect);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.eventsource.client;

import javax.net.ssl.SSLEngine;

public interface SSLEngineProvider {
SSLEngine createSSLEngine();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,18 @@ public void run() {
}
});
}

@Override
public void onClosed(final boolean willReconnect) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
eventSourceHandler.onClosed(willReconnect);
} catch (Exception e) {
onError(e);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
import com.github.eventsource.client.impl.ConnectionHandler;
import com.github.eventsource.client.impl.EventStreamParser;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;

import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ExceptionEvent;

import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpMethod;
Expand All @@ -17,20 +25,24 @@
import org.jboss.netty.util.TimerTask;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler implements ConnectionHandler {
private static final Pattern STATUS_PATTERN = Pattern.compile("HTTP/1.1 (\\d+) (.*)");
private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream");
private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream", Pattern.CASE_INSENSITIVE);

private final EventSourceHandler eventSourceHandler;
private final ClientBootstrap bootstrap;
private final URI uri;
private final EventStreamParser messageDispatcher;
private final Map<String,String> customRequestHeaders = new HashMap<String,String>();

private final Timer timer = new HashedWheelTimer();
private Channel channel;
Expand All @@ -57,14 +69,23 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exc

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString());
final String query = uri.getQuery();
final String path = uri.getPath() + (((null != query) && !query.isEmpty()) ? "?" + query : "");
final int port = uri.getPort();
final String portPostfix = ((port != -1) ? ":" + port : "");
final String host = uri.getHost() + portPostfix;
final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
request.addHeader(Names.ACCEPT, "text/event-stream");
request.addHeader(Names.HOST, uri.getHost());
request.addHeader(Names.ORIGIN, "http://" + uri.getHost());
request.addHeader(Names.HOST, host);
request.addHeader(Names.ORIGIN, uri.getScheme() + "://" + host);
request.addHeader(Names.CACHE_CONTROL, "no-cache");
if (lastEventId != null) {
request.addHeader("Last-Event-ID", lastEventId);
}
// add any custom headers that have been set
for (String name : customRequestHeaders.keySet()) {
request.addHeader(name, customRequestHeaders.get(name));
}
e.getChannel().write(request);
channel = e.getChannel();
}
Expand All @@ -76,6 +97,10 @@ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)

@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if (eventStreamOk) {
// call onClosed only if it was successfully opened (and onConnect was called)
eventSourceHandler.onClosed(reconnectOnClose);
}
if (reconnectOnClose) {
reconnect();
}
Expand Down Expand Up @@ -150,16 +175,28 @@ public EventSourceChannelHandler join() throws InterruptedException {
return this;
}

/**
* Sets a custom HTTP header that will be used when the request is made to establish the SSE channel.
*
* @param name the HTTP header name
* @param value the header value
*/
public void setCustomRequestHeader(String name, String value) {
customRequestHeaders.put(name, value);
}

private void reconnect() {
if(!reconnecting.get()) {
reconnecting.set(true);
if (reconnecting.compareAndSet(false, true)) {
headerDone = false;
eventStreamOk = false;
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
reconnecting.set(false);
bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
bootstrap.connect().await();
}
}, reconnectionTimeMillis, TimeUnit.MILLISECONDS);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public void onConnect() throws Exception {
connected = true;
}

@Override
public void onClosed(boolean willReconnect){
connected = false;
}

@Override
public void onMessage(String event, MessageEvent message) throws Exception {
getMessageEvents(event).add(message);
Expand Down
Loading