Skip to content

Commit d6ddeda

Browse files
committed
adjusted queue sizes, uniform TCPnodelay
1 parent af06002 commit d6ddeda

File tree

8 files changed

+44
-124
lines changed

8 files changed

+44
-124
lines changed

src/main/java/org/ros/internal/node/server/BaseServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void run() {
5555
try {
5656
Socket datasocket = server.accept();
5757
// disable Nagles algoritm; do not combine small packets into larger ones
58-
datasocket.setTcpNoDelay(true);
58+
//datasocket.setTcpNoDelay(true);
5959
// wait 1 second before close; close blocks for 1 sec. and data can be sent
6060
datasocket.setSoLinger(true, 1);
6161
//

src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class IncomingMessageQueue<T> {
2323
* {@link IncomingMessageQueue#addListener(MessageListener, int)} which are
2424
* consumed by user provided {@link MessageListener}s.
2525
*/
26-
private static final int DEQUE_CAPACITY = 16384;
26+
private static final int DEQUE_CAPACITY = 8192;
2727

2828
private final MessageReceiver<T> messageReceiver;
2929
private final MessageDispatcher<T> messageDispatcher;

src/main/java/org/ros/internal/transport/queue/MessageReceiver.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
*/
2020
public class MessageReceiver<T> extends AbstractNamedChannelHandler {
2121

22-
private static final boolean DEBUG = true;
22+
private static final boolean DEBUG = false;
2323
private static final Log log = LogFactory.getLog(MessageReceiver.class);
2424

2525
private final CircularBlockingDeque<T> lazyMessages;
@@ -45,7 +45,6 @@ public Object channelRead(ChannelHandlerContext ctx, Object msg) throws Exceptio
4545
}
4646

4747

48-
4948
@Override
5049
public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1)
5150
throws Exception {
@@ -94,8 +93,6 @@ public void channelReadComplete(ChannelHandlerContext arg0) throws Exception {
9493

9594
}
9695

97-
98-
9996
@Override
10097
public void userEventTriggered(ChannelHandlerContext arg0, Object arg1)
10198
throws Exception {

src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
*/
3131
public class OutgoingMessageQueue<T> {
3232

33-
private static final boolean DEBUG = true;
33+
private static final boolean DEBUG = false;
3434
private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);
3535

36-
private static final int DEQUE_CAPACITY = 16384;
36+
private static final int DEQUE_CAPACITY = 8192;
3737

3838
private final CircularBlockingDeque<T> deque;
3939
private final Writer writer;

src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void run() {
5050
//(/*(AsynchronousSocketChannel)*/channel/*.get()*/).setOption(StandardSocketOptions.TCP_NODELAY, true);
5151
channel.setSendBufferSize(4096000);
5252
channel.setReceiveBufferSize(4096000);
53-
channel.setTcpNoDelay(true);
53+
//channel.setTcpNoDelay(true);
5454
ChannelHandlerContext ctx = new ChannelHandlerContextImpl(channelGroup, channel/*.get()*/, exc);
5555
tcpserver.getSubscribers().add(ctx);
5656
// inject the handlers, start handshake

src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java

Lines changed: 8 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22

33
import java.io.IOException;
44

5-
import java.nio.ByteBuffer;
6-
75
import org.apache.commons.logging.Log;
86
import org.apache.commons.logging.LogFactory;
97

10-
import org.ros.internal.message.MessageBuffers;
11-
import org.ros.internal.system.Utility;
128
import org.ros.internal.transport.ChannelHandlerContext;
139

1410
/**
@@ -19,12 +15,11 @@
1915
*
2016
*/
2117
public class AsynchTCPWorker implements Runnable {
22-
private static final boolean DEBUG = true;
18+
private static final boolean DEBUG = false;
2319
private static final Log log = LogFactory.getLog(AsynchTCPWorker.class);
2420
public boolean shouldRun = true;
2521
private ChannelHandlerContext ctx;
2622
private Object waitHalt = new Object();
27-
//private MessageBufferPool pool = new MessageBufferPool();
2823

2924
public AsynchTCPWorker(ChannelHandlerContext ctx) throws IOException {
3025
this.ctx = ctx;
@@ -37,84 +32,21 @@ public AsynchTCPWorker(ChannelHandlerContext ctx) throws IOException {
3732
*/
3833
@Override
3934
public void run() {
40-
//final Object waitFinish = ctx.getChannelCompletionMutex();
4135
try {
4236
while(shouldRun) {
43-
//final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire();
44-
// initiate asynch read
45-
// If we get a read pending exception, try again
46-
//final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire();
47-
//buf.clear();
48-
//final CountDownLatch cdl = new CountDownLatch(1);
49-
//int res = ctx.read(buf);
50-
// seems like a -1 is generated when channel breaks, so stop
51-
// this worker on that case
52-
//if( res == -1) {
53-
// shouldRun = false;
54-
// if( DEBUG )
55-
// log.info("ROS AsynchTCPWorker CHANNEL BREAK, TERMINATING for "+ctx);
56-
//} else {
57-
// buf.flip();
58-
// Object reso = Utility.deserialize(buf);
5937
Object reso = ctx.read();
60-
61-
if( DEBUG )
38+
if( DEBUG )
6239
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" Object:"+reso);
63-
try {
40+
try {
6441
ctx.pipeline().fireChannelRead(reso);
65-
} catch (Exception e) {
66-
if( DEBUG) {
42+
} catch (Exception e) {
43+
if( DEBUG) {
6744
log.info("Exception out of fireChannelRead",e);
6845
e.printStackTrace();
69-
}
70-
ctx.pipeline().fireExceptionCaught(e);
7146
}
72-
//}
73-
/*
74-
ctx.read(buf, new CompletionHandler<Integer, Void>() {
75-
@Override
76-
public void completed(Integer arg0, Void arg1) {
77-
buf.flip();
78-
Object res = Utility.deserialize(buf);
79-
//if( res == null ) {
80-
// cdl.countDown();
81-
// return;
82-
//}
83-
84-
if( DEBUG )
85-
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" Object:"+res+" Result:"+arg0+","+arg1);
86-
try {
87-
ctx.pipeline().fireChannelRead(res);
88-
} catch (Exception e) {
89-
if( DEBUG) {
90-
log.info("Exception out of fireChannelRead",e);
91-
e.printStackTrace();
92-
}
93-
}
94-
cdl.countDown();
95-
96-
}
97-
@Override
98-
public void failed(Throwable arg0, Void arg1) {
99-
if( DEBUG ){
100-
log.info("AsynchTcpWorker read op failed:",arg0);
101-
arg0.printStackTrace();
102-
}
103-
try {
104-
ctx.pipeline().fireExceptionCaught(arg0);
105-
} catch (Exception e) {
106-
e.printStackTrace();
107-
}
108-
if( arg0 instanceof ClosedChannelException ) {
109-
shouldRun = false;
110-
}
111-
cdl.countDown();
112-
}
113-
});
114-
*/
115-
//cdl.await(); // readpendingexception if we overlap operations
116-
} // shouldRun
117-
47+
ctx.pipeline().fireExceptionCaught(e);
48+
}
49+
} // shouldRun
11850
} catch(Exception se) {
11951
log.error("AsynchTCPWorker terminating due to ",se);
12052
} finally {

src/main/java/org/ros/internal/transport/tcp/TcpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void addAllNamedChannelHandlers(List<NamedChannelHandler> namedChannelHan
7070
public Socket connect(String connectionName, SocketAddress socketAddress) throws Exception {
7171
//channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/);
7272
channel = new Socket();
73-
channel.setTcpNoDelay(false);
73+
//channel.setTcpNoDelay(true);
7474
channel.setSendBufferSize(4096000);
7575
channel.setSendBufferSize(4096000);
7676
//((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_RCVBUF, 4096000);

src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
package org.ros.internal.transport.tcp;
22

33
import java.io.IOException;
4-
import java.io.ObjectOutputStream;
5-
import java.io.OutputStream;
6-
import java.nio.ByteBuffer;
7-
import java.nio.channels.CompletionHandler;
8-
import java.nio.channels.SocketChannel;
94

105
import org.apache.commons.logging.Log;
116
import org.apache.commons.logging.LogFactory;
@@ -24,6 +19,8 @@
2419

2520
/**
2621
* A {@link ChannelHandler} which will process the TCP server handshake.
22+
* Once an incoming channel read takes place the handshake handler is removed and the traffic
23+
* handler is placed in the pipeline
2724
*
2825
* @author jg
2926
*/
@@ -43,10 +40,13 @@ public TcpServerHandshakeHandler(TopicParticipantManager topicParticipantManager
4340

4441
@Override
4542
public void channelActive(ChannelHandlerContext ctx) {
46-
log.info("Channel active");
43+
if(DEBUG)
44+
log.info("Channel active");
4745
}
4846
/**
4947
* Channel read initiated by pipeline generated message
48+
* We make the assumption that the inbound object is of type ConnectionHeader
49+
* when talking to this handler
5050
*/
5151
@Override
5252
public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception {
@@ -63,7 +63,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception
6363
return e;
6464
}
6565
/**
66-
* Handle the handshake for a service
66+
* Handle the handshake for a service in response to channel read
6767
* @param ctx
6868
* @param incomingHeader
6969
* @throws IOException
@@ -87,7 +87,7 @@ private void handleServiceHandshake(ChannelHandlerContext ctx, ConnectionHeader
8787
}
8888
}
8989
/**
90-
* Handle the handshake for a typical (not a service) subscriber.
90+
* Handle the handshake for a typical (not a service) subscriber in response to a channel read.
9191
* @param ctx
9292
* @param incomingConnectionHeader
9393
* @throws InterruptedException
@@ -112,61 +112,51 @@ private void handleSubscriberHandshake(final ChannelHandlerContext ctx, final Co
112112

113113
ctx.write(outgoingBuffer);
114114

115-
/*
116-
ctx.write(outgoingBuffer, new CompletionHandler<Integer, Void>() {
117-
@Override
118-
public void completed(Integer arg0, Void arg1) {
119-
*/
120-
String nodeName = incomingConnectionHeader.getField(ConnectionHeaderFields.CALLER_ID);
121-
publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName), new TopicIdentifier(topicName)), ctx);
122-
// Once the handshake is complete, there will be nothing incoming on the
123-
// channel as we are only queueing outbound traffic to the subscriber, which is done by the OutgoingMessgequeue.
124-
// So, we remove the handler
125-
ctx.pipeline().remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER);
126-
// Set this context ready to receive the message type specified
127-
synchronized(ctx.getMessageTypes()) {
115+
String nodeName = incomingConnectionHeader.getField(ConnectionHeaderFields.CALLER_ID);
116+
publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName), new TopicIdentifier(topicName)), ctx);
117+
// Once the handshake is complete, there will be nothing incoming on the
118+
// channel as we are only queueing outbound traffic to the subscriber, which is done by the OutgoingMessgequeue.
119+
// So, we remove the handler
120+
ctx.pipeline().remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER);
121+
// Set this context ready to receive the message type specified
122+
synchronized(ctx.getMessageTypes()) {
128123
ctx.getMessageTypes().add(incomingConnectionHeader.getField(ConnectionHeaderFields.TYPE));
129-
}
130-
// The handshake is complete and the only task is to set the context ready, which will allow
131-
// the outbound queue to start sending messages.
132-
ctx.setReady(true);
124+
}
125+
// The handshake is complete and the only task is to set the context ready, which will allow
126+
// the outbound queue to start sending messages.
127+
ctx.setReady(true);
133128

134-
if( DEBUG ) {
135-
log.info("subscriber complete:"+outgoingBuffer);
136-
}
137-
/*
138-
}
139-
@Override
140-
public void failed(Throwable arg0, Void arg1) {
141-
log.info("Failed to perform handshake for:"+ctx);
142-
}
143-
*/
144-
//});
145-
146-
129+
if( DEBUG ) {
130+
log.info("subscriber complete:"+outgoingBuffer);
131+
}
132+
147133
}
148134

149135
@Override
150136
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
137+
if( DEBUG )
151138
log.info(this+" Handler added "+ctx);
152139

153140
}
154141

155142
@Override
156143
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
144+
if(DEBUG)
157145
log.info("Handler removed "+ctx);
158146

159147
}
160148

161149
@Override
162150
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
151+
if(DEBUG)
163152
log.info("Channel inactive "+ctx);
164153

165154
}
166155

167156

168157
@Override
169158
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
159+
if(DEBUG)
170160
log.info("channel read complete "+ctx);
171161

172162
}
@@ -180,6 +170,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable msg)throws Exce
180170
@Override
181171
public void userEventTriggered(ChannelHandlerContext ctx, Object event)
182172
throws Exception {
173+
if(DEBUG)
183174
log.info("User event triggered "+ctx+" "+event);
184175

185176
}

0 commit comments

Comments
 (0)