Skip to content

Commit 52c5db3

Browse files
committed
Operational Beta release. Bug with image data over 65k, tends not to receive entire buffer sometimes. channel bug?
1 parent 19afda9 commit 52c5db3

21 files changed

+195
-112
lines changed

src/main/java/org/ros/internal/node/service/DefaultServiceServer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import org.ros.address.AdvertiseAddress;
99
import org.ros.concurrent.ListenerGroup;
1010
import org.ros.concurrent.SignalRunnable;
11+
import org.ros.internal.message.MessageBuffers;
1112
import org.ros.internal.message.service.ServiceDescription;
1213
import org.ros.internal.node.topic.DefaultPublisher;
14+
import org.ros.internal.system.Utility;
1315
import org.ros.internal.transport.ChannelHandler;
1416
import org.ros.internal.transport.ConnectionHeader;
1517
import org.ros.internal.transport.ConnectionHeaderFields;
@@ -75,7 +77,7 @@ public void onMasterUnregistrationFailure(ServiceServer<T, S> registrant) {
7577
});
7678
}
7779

78-
public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeader) {
80+
public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
7981
if (DEBUG) {
8082
log.info("Client handshake header: " + incomingConnectionHeader);
8183
}
@@ -88,7 +90,9 @@ public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeade
8890
if (DEBUG) {
8991
log.info("Server handshake header: " + connectionHeader);
9092
}
91-
return connectionHeader;
93+
ByteBuffer headbuf = MessageBuffers.dynamicBuffer();
94+
Utility.serialize(connectionHeader, headbuf);
95+
return headbuf;
9296
}
9397

9498
@Override

src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
//import org.jboss.netty.channel.ChannelHandlerContext;
66
//import org.jboss.netty.channel.MessageEvent;
77
//import org.jboss.netty.channel.SimpleChannelHandler;
8+
import org.ros.exception.RosRuntimeException;
89
import org.ros.exception.ServiceException;
910
import org.ros.internal.message.MessageBufferPool;
1011
import org.ros.internal.system.Utility;
@@ -13,6 +14,7 @@
1314
import org.ros.message.MessageFactory;
1415
import org.ros.node.service.ServiceResponseBuilder;
1516

17+
import java.io.IOException;
1618
import java.nio.ByteBuffer;
1719
import java.nio.charset.Charset;
1820
import java.util.concurrent.ExecutorService;
@@ -39,9 +41,7 @@ public ServiceRequestHandler(ServiceDeclaration serviceDeclaration,
3941
messageBufferPool = new MessageBufferPool();
4042
}
4143

42-
private void handleRequest(ByteBuffer requestBuffer, ByteBuffer responseBuffer)
43-
throws ServiceException {
44-
T request = (T) Utility.deserialize(requestBuffer);
44+
private void handleRequest(T request, ByteBuffer responseBuffer) throws ServiceException {
4545
S response = messageFactory.newFromType(serviceDeclaration.getType());
4646
responseBuilder.build(request, response);
4747
Utility.serialize(response, responseBuffer);
@@ -51,24 +51,34 @@ private void handleSuccess(final ChannelHandlerContext ctx, ServiceServerRespons
5151
response.setErrorCode(1);
5252
response.setMessageLength(responseBuffer.limit());
5353
response.setMessage(responseBuffer);
54-
ctx.write(response);
54+
ByteBuffer resbuf = messageBufferPool.acquire();
55+
Utility.serialize(response, resbuf);
56+
try {
57+
ctx.write(resbuf);
58+
} catch (IOException e) {
59+
throw new RosRuntimeException(e);
60+
}
5561
}
5662

57-
private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response,
58-
String message) {
63+
private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, String message, ByteBuffer responseBuffer) {
5964
response.setErrorCode(0);
6065
ByteBuffer encodedMessage = Charset.forName("US-ASCII").encode(message);
6166
response.setMessageLength(encodedMessage.limit());
6267
response.setMessage(encodedMessage);
63-
ctx.write(response);
68+
Utility.serialize(response, responseBuffer);
69+
try {
70+
ctx.write(responseBuffer);
71+
} catch (IOException e) {
72+
throw new RosRuntimeException(e);
73+
}
6474
}
6575

6676
@Override
6777
public Object channelRead(final ChannelHandlerContext ctx, Object e) throws Exception {
6878
// Although the ChannelHandlerContext is explicitly documented as being safe
6979
// to keep for later use, the MessageEvent is not. So, we make a defensive
7080
// copy of the buffer.
71-
final ByteBuffer requestBuffer = ((ByteBuffer) e);
81+
final T requestBuffer = ((T) e);
7282
this.executorService.execute(new Runnable() {
7383
@Override
7484
public void run() {
@@ -79,7 +89,7 @@ public void run() {
7989
handleRequest(requestBuffer, responseBuffer);
8090
success = true;
8191
} catch (ServiceException ex) {
82-
handleError(ctx, response, ex.getMessage());
92+
handleError(ctx, response, ex.getMessage(), responseBuffer);
8393
success = false;
8494
}
8595
if (success) {

src/main/java/org/ros/internal/system/Utility.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
*
2121
*/
2222
public class Utility {
23-
private static boolean DEBUG = false;
23+
private static boolean DEBUG = true;
2424
private static final Log log = LogFactory.getLog(Utility.class);
25+
2526
public static <T> void serialize(T value, ByteBuffer buffer) {
26-
//serializer.serialize((Message) value, buffer);
2727
DirectByteArrayOutputStream dbaos = new DirectByteArrayOutputStream();
2828
ObjectOutputStream oos;
2929
try {
@@ -41,7 +41,6 @@ public static <T> void serialize(T value, ByteBuffer buffer) {
4141

4242

4343
public static Object deserialize(ByteBuffer buffer) {
44-
//return deserializer.deserialize(buffer);
4544
byte[] obuf = buffer.array();
4645
Object Od = null;
4746
try {
@@ -53,8 +52,7 @@ public static Object deserialize(ByteBuffer buffer) {
5352
s.close();
5453
bais.close();
5554
//rbc.close();
56-
} catch (IOException ioe) {
57-
} catch (ClassNotFoundException cnf) {
55+
} catch (IOException | ClassNotFoundException cnf) {
5856
log.error("Class cannot be deserialized, may have been modified beyond version compatibility");
5957
}
6058
if( DEBUG )

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
import java.util.concurrent.ExecutorService;
1515

1616
/**
17-
* Abstraction of top level ChannelHandler interface
17+
* Abstraction of top level ChannelHandler interface.
1818
* Common functionality for {@link ClientHandshake} handlers.
19+
* @author jg
1920
*
2021
*/
2122
public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {

src/main/java/org/ros/internal/transport/ChannelHandlerContext.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
import java.nio.channels.AsynchronousSocketChannel;
88
import java.nio.channels.Channel;
99
import java.nio.channels.CompletionHandler;
10+
import java.nio.channels.SocketChannel;
1011
import java.util.Set;
1112
import java.util.concurrent.Executor;
1213
import java.util.concurrent.Future;
1314

15+
import org.ros.internal.transport.tcp.ChannelGroup;
16+
1417
public interface ChannelHandlerContext {
1518
/**
1619
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
@@ -39,7 +42,7 @@ public interface ChannelHandlerContext {
3942
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
4043
* {@link Channel}.
4144
*/
42-
AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException;
45+
/*Asynchronous*/SocketChannel bind(SocketAddress localAddress) throws IOException;
4346

4447
/**
4548
* Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
@@ -53,8 +56,9 @@ public interface ChannelHandlerContext {
5356
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
5457
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
5558
* {@link Channel}.
59+
* @throws IOException
5660
*/
57-
void connect(SocketAddress remoteAddress);
61+
void connect(SocketAddress remoteAddress) throws IOException;
5862

5963
/**
6064
* Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
@@ -109,8 +113,9 @@ public interface ChannelHandlerContext {
109113
* {@link ChannelHandler#read(ChannelHandlerContext)}
110114
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
111115
* {@link Channel}.
116+
* @throws IOException
112117
*/
113-
Future<Integer> read(ByteBuffer buf);
118+
/*Future<Integer>*/int read(ByteBuffer buf) throws IOException;
114119
/**
115120
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
116121
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was
@@ -122,22 +127,24 @@ public interface ChannelHandlerContext {
122127
* {@link ChannelHandler#read(ChannelHandlerContext)}
123128
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
124129
* {@link Channel}.
130+
* @throws IOException
125131
*/
126132
void read(ByteBuffer buf,CompletionHandler<Integer, Void> completionHandler);
127133

128134
/**
129135
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
130136
* This method will not request to actual flush, so be sure to call {@link #flush()}
131137
* once you want to request to flush all pending data to the actual transport.
138+
* @throws IOException
132139
*/
133-
Future<Integer> write(Object msg);
140+
/*Future<Integer>*/int write(ByteBuffer msg) throws IOException;
134141

135142
/**
136143
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
137144
* This method will not request to actual flush, so be sure to call {@link #flush()}
138145
* once you want to request to flush all pending data to the actual transport.
139146
*/
140-
void write(Object msg, CompletionHandler<Integer,Void> handler);
147+
void write(ByteBuffer msg, CompletionHandler<Integer,Void> handler);
141148

142149

143150
/**
@@ -149,7 +156,7 @@ public interface ChannelHandlerContext {
149156
* Return the channel group
150157
*
151158
*/
152-
AsynchronousChannelGroup getChannelGroup();
159+
/*Asynchronous*/ChannelGroup getChannelGroup();
153160

154161
/**
155162
* Determine if this channel is ready for processing, it is configured, has a socket

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
package org.ros.internal.transport;
22

33
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
46
import java.net.SocketAddress;
57
import java.nio.ByteBuffer;
6-
import java.nio.channels.AsynchronousChannelGroup;
8+
//import java.nio.channels.AsynchronousChannelGroup;
79
import java.nio.channels.AsynchronousSocketChannel;
810
import java.nio.channels.Channel;
911
import java.nio.channels.CompletionHandler;
12+
import java.nio.channels.SocketChannel;
1013
import java.util.Collections;
1114
import java.util.HashSet;
1215
import java.util.Set;
1316
import java.util.concurrent.Executor;
1417
import java.util.concurrent.Future;
18+
import java.util.concurrent.TimeUnit;
19+
20+
import org.ros.internal.transport.tcp.ChannelGroup;
1521

1622
/**
1723
* A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers.
@@ -29,24 +35,24 @@
2935
*
3036
*/
3137
public class ChannelHandlerContextImpl implements ChannelHandlerContext {
32-
AsynchronousChannelGroup channelGroup;
38+
/*Asynchronous*/ChannelGroup channelGroup;
3339
Executor executor;
34-
AsynchronousSocketChannel channel;
40+
/*Asynchronous*/SocketChannel channel;
3541
ChannelPipeline pipeline;
3642
boolean ready = false;
3743
Object mutex = new Object();
3844
Set<String> outboundMessageTypes;
3945

4046

41-
public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
42-
channelGroup = grp;
43-
channel = ch;
47+
public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/SocketChannel channel2, Executor exc) {
48+
channelGroup = channelGroup2;
49+
channel = channel2;
4450
executor = exc;
4551
pipeline = new ChannelPipelineImpl(this);
4652
outboundMessageTypes = (Set<String>) new HashSet<String>();
4753
}
4854

49-
public void setChannel(AsynchronousSocketChannel sock) {
55+
public void setChannel(/*Asynchronous*/SocketChannel sock) {
5056
this.channel = sock;
5157
}
5258

@@ -63,12 +69,12 @@ public String name() {
6369

6470

6571
@Override
66-
public AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException {
72+
public /*Asynchronous*/SocketChannel bind(SocketAddress localAddress) throws IOException {
6773
return channel.bind(localAddress);
6874
}
6975

7076
@Override
71-
public void connect(SocketAddress remoteAddress) {
77+
public void connect(SocketAddress remoteAddress) throws IOException {
7278
channel.connect(remoteAddress);
7379
}
7480

@@ -91,23 +97,33 @@ public void close() throws IOException {
9197

9298

9399
@Override
94-
public Future<Integer> read(ByteBuffer buf) {
95-
return channel.read(buf);
100+
public /*Future<Integer>*/ int read(ByteBuffer buf) throws IOException {
101+
return channel.read(buf);
96102
}
97103

98104
@Override
99-
public Future<Integer> write(Object msg) {
100-
return channel.write((ByteBuffer) msg);
105+
public /*Future<Integer>*/int write(ByteBuffer msg) throws IOException {
106+
return channel.write(msg);
101107
}
102108

103109
@Override
104-
public void write(Object msg, CompletionHandler<Integer, Void> handler) {
105-
channel.write((ByteBuffer)msg, null, handler);
110+
public void write(ByteBuffer msg, CompletionHandler<Integer, Void> handler) {
111+
try {
112+
int res = channel.write(msg/*, null, handler*/);
113+
handler.completed(res, null);
114+
} catch (IOException e) {
115+
handler.failed(e, null);
116+
}
106117
}
107118

108119
@Override
109120
public void read(ByteBuffer buf, CompletionHandler<Integer, Void> handler) {
110-
channel.read(buf, null, handler);
121+
try {
122+
int res = channel.read(buf/*, null, handler*/);
123+
handler.completed(res, null);
124+
} catch (IOException e) {
125+
handler.failed(e, null);
126+
}
111127
}
112128

113129

@@ -117,7 +133,7 @@ public ChannelPipeline pipeline() {
117133
}
118134

119135
@Override
120-
public AsynchronousChannelGroup getChannelGroup() {
136+
public /*Asynchronous*/ChannelGroup getChannelGroup() {
121137
return channelGroup;
122138
}
123139

src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.commons.logging.LogFactory;
1515
import org.ros.concurrent.CancellableLoop;
1616
import org.ros.concurrent.CircularBlockingDeque;
17+
import org.ros.exception.RosRuntimeException;
1718
import org.ros.internal.message.Message;
1819
import org.ros.internal.message.MessageBufferPool;
1920
import org.ros.internal.message.MessageBuffers;
@@ -37,7 +38,6 @@ public class OutgoingMessageQueue<T> {
3738

3839
private final CircularBlockingDeque<T> deque;
3940
private final Writer writer;
40-
private final MessageBufferPool messageBufferPool;
4141
private final ByteBuffer latchedBuffer;
4242
private final Object mutex;
4343

@@ -105,7 +105,7 @@ public void failed(Throwable arg0, Void arg1) {
105105
public OutgoingMessageQueue(ExecutorService executorService, List<ChannelHandlerContext> ctxs) throws IOException {
106106
deque = new CircularBlockingDeque<T>(DEQUE_CAPACITY);
107107
writer = new Writer();
108-
messageBufferPool = new MessageBufferPool();
108+
//messageBufferPool = new MessageBufferPool();
109109
latchedBuffer = MessageBuffers.dynamicBuffer();
110110
mutex = new Object();
111111
latchMode = false;
@@ -151,7 +151,11 @@ private void writeLatchedMessage() {
151151
Iterator<ChannelHandlerContext> it = channels.iterator();
152152
while(it.hasNext()) {
153153
ChannelHandlerContext ctx = it.next();
154-
ctx.write(latchedBuffer);
154+
try {
155+
ctx.write(latchedBuffer);
156+
} catch (IOException e) {
157+
throw new RosRuntimeException(e);
158+
}
155159
}
156160
}
157161
}

0 commit comments

Comments
 (0)