Skip to content

Commit 2532b89

Browse files
committed
Netty removal debug
1 parent b97ec94 commit 2532b89

13 files changed

+197
-267
lines changed

src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5-
//import org.jboss.netty.channel.ChannelHandlerContext;
6-
//import org.jboss.netty.channel.ChannelPipeline;
7-
//import org.jboss.netty.channel.MessageEvent;
85
import org.ros.internal.transport.BaseClientHandshakeHandler;
96
import org.ros.internal.transport.ChannelHandlerContext;
107
import org.ros.internal.transport.ChannelPipeline;
@@ -16,13 +13,15 @@
1613
import org.ros.node.topic.Subscriber;
1714

1815
import java.io.IOException;
19-
import java.net.SocketAddress;
2016
import java.util.concurrent.ExecutorService;
2117

2218
/**
2319
* Performs a handshake with the connected {@link Publisher} and connects the
2420
* {@link Publisher} to the {@link IncomingMessageQueue} on success.
2521
*
22+
* In the AsynchTcpWorker thread that handles the read for each channel, it strobes the pipeline
23+
* with the read notification and the handler here takes care of the processing.
24+
*
2625
* @author jg
2726
*
2827
* @param <T>
@@ -44,6 +43,8 @@ public SubscriberHandshakeHandler(ConnectionHeader outgoingConnectionHeader,
4443

4544
@Override
4645
protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandlerContext ctx) {
46+
if( DEBUG )
47+
log.info("SubscriberHandshakeHandler.onSuccess:"+ctx+" "+incomingConnectionHeader);
4748
ChannelPipeline pipeline = ctx.pipeline();
4849
pipeline.remove(SubscriberHandshakeHandler.this);
4950
NamedChannelHandler namedChannelHandler = incomingMessageQueue.getMessageReceiver();

src/main/java/org/ros/internal/transport/ChannelHandlerContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public interface ChannelHandlerContext {
108108
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
109109
* {@link Channel}.
110110
*/
111-
Future<Integer> read();
111+
Future<Integer> read(ByteBuffer buf);
112112

113113
/**
114114
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
@@ -149,7 +149,8 @@ public interface ChannelHandlerContext {
149149
*
150150
*/
151151
AsynchronousChannelGroup getChannelGroup();
152-
152+
153+
153154

154155

155156
}

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,11 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
3030
AsynchronousSocketChannel channel;
3131
ChannelPipeline pipeline;
3232

33-
ByteBuffer buf;
34-
int MAXBUF = 2000000;
3533

3634
public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
3735
channelGroup = grp;
3836
channel = ch;
3937
executor = exc;
40-
buf = ByteBuffer.allocate(MAXBUF);
4138
pipeline = new ChannelPipelineImpl(this);
4239
}
4340

@@ -86,7 +83,7 @@ public void close() throws IOException {
8683

8784

8885
@Override
89-
public Future<Integer> read() {
86+
public Future<Integer> read(ByteBuffer buf) {
9087
return channel.read(buf);
9188
}
9289

@@ -137,4 +134,5 @@ public String toString() {
137134
return new String("ChannelHandlerContext:"+channel+" "+channelGroup+" "+executor+" "+pipeline);
138135
}
139136

137+
140138
}

src/main/java/org/ros/internal/transport/ChannelPipeline.java

Lines changed: 52 additions & 115 deletions
Large diffs are not rendered by default.

src/main/java/org/ros/internal/transport/ChannelPipelineImpl.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.LinkedBlockingDeque;
1111

12+
import org.ros.exception.RosRuntimeException;
13+
1214
/**
1315
* Implementation of the ChannelPipeline interface to process the requests via the pluggable ChannelHandlers
1416
* named in the queue. The pipeline is per ChannelHandlerContext, per channel as it represents state in the form of the
@@ -37,26 +39,48 @@ public Iterator<Entry<String, ChannelHandler>> iterator() {
3739
@Override
3840
public ChannelPipeline addFirst(String name, ChannelHandler handler) {
3941
queue.addFirst(new AbstractMap.SimpleEntry<String, ChannelHandler>(name, handler));
42+
try {
43+
queue.peekFirst().getValue().handlerAdded(ctx);
44+
} catch (Exception e) {
45+
throw new RosRuntimeException(e);
46+
}
4047
return this;
4148
}
4249

4350

4451
@Override
4552
public ChannelPipeline addLast(String name, ChannelHandler handler) {
4653
queue.addLast(new AbstractMap.SimpleEntry<String, ChannelHandler>(name, handler));
54+
try {
55+
queue.peekLast().getValue().handlerAdded(ctx);
56+
} catch (Exception e) {
57+
throw new RosRuntimeException(e);
58+
}
4759
return this;
4860
}
4961

5062

51-
63+
/**
64+
* Remove and fire event handlerRemoved to handler after removed one
65+
*/
5266
@Override
5367
public ChannelPipeline remove(ChannelHandler handler) {
5468
Iterator<Map.Entry<String,ChannelHandler>> it = iterator();
69+
boolean found = false;
5570
while(it.hasNext()) {
5671
Map.Entry<String,ChannelHandler> me = it.next();
57-
if( me.getValue().equals(handler) ) {
72+
if( !found && me.getValue().equals(handler) ) {
5873
it.remove();
59-
break;
74+
found = true;
75+
continue;
76+
}
77+
if( found ) {
78+
try {
79+
me.getValue().handlerRemoved(ctx);
80+
break;
81+
} catch (Exception e) {
82+
throw new RosRuntimeException(e);
83+
}
6084
}
6185
}
6286
return this;
@@ -65,24 +89,40 @@ public ChannelPipeline remove(ChannelHandler handler) {
6589
@Override
6690
public ChannelHandler remove(String name) {
6791
Iterator<Map.Entry<String,ChannelHandler>> it = iterator();
68-
Map.Entry<String,ChannelHandler> me = null;
92+
ChannelHandler val = null;
93+
boolean found = false;
6994
while(it.hasNext()) {
70-
me = it.next();
71-
if( me.getKey().equals(name) ) {
95+
Map.Entry<String,ChannelHandler> me = it.next();
96+
if( !found && me.getKey().equals(name) ) {
97+
val = me.getValue();
7298
it.remove();
73-
break;
99+
found = true;
100+
continue;
101+
}
102+
if( found ) {
103+
try {
104+
me.getValue().handlerRemoved(ctx);
105+
break;
106+
} catch (Exception e) {
107+
throw new RosRuntimeException(e);
108+
}
74109
}
75110
}
76-
if( me == null)
77-
return null;
78-
return me.getValue();
111+
return val;
79112
}
80113

81114

82115
@Override
83116
public ChannelHandler removeFirst() {
84117
Map.Entry<String,ChannelHandler> me = queue.removeFirst();
85118
if( me == null ) return null;
119+
if( queue.isEmpty())
120+
return me.getValue();
121+
try {
122+
queue.peekFirst().getValue().handlerRemoved(ctx);
123+
} catch (Exception e) {
124+
throw new RuntimeException(e);
125+
}
86126
return me.getValue();
87127
}
88128

src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.transport.queue;
182

193
import org.ros.concurrent.CircularBlockingDeque;
@@ -24,7 +8,8 @@
248
import java.util.concurrent.ExecutorService;
259

2610
/**
27-
* @author damonkohler@google.com (Damon Kohler)
11+
* Created in default subscriber to handle the incoming messages
12+
* @author jg
2813
*/
2914
public class IncomingMessageQueue<T> {
3015

@@ -37,7 +22,7 @@ public class IncomingMessageQueue<T> {
3722
* {@link IncomingMessageQueue#addListener(MessageListener, int)} which are
3823
* consumed by user provided {@link MessageListener}s.
3924
*/
40-
private static final int DEQUE_CAPACITY = 16;
25+
private static final int DEQUE_CAPACITY = 16384;
4126

4227
private final MessageReceiver<T> messageReceiver;
4328
private final MessageDispatcher<T> messageDispatcher;

src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.io.IOException;
44
import java.nio.ByteBuffer;
5-
import java.nio.channels.AsynchronousChannelGroup;
65
import java.nio.channels.Channel;
76
import java.nio.channels.CompletionHandler;
87
import java.util.Iterator;
@@ -11,35 +10,29 @@
1110

1211
import org.apache.commons.logging.Log;
1312
import org.apache.commons.logging.LogFactory;
14-
15-
//import org.jboss.netty.buffer.ChannelBuffer;
16-
//import org.jboss.netty.channel.Channel;
17-
//import org.jboss.netty.channel.group.ChannelGroup;
18-
//import org.jboss.netty.channel.group.ChannelGroupFuture;
19-
//import org.jboss.netty.channel.group.ChannelGroupFutureListener;
20-
//import org.jboss.netty.channel.group.DefaultChannelGroup;
21-
2213
import org.ros.concurrent.CancellableLoop;
2314
import org.ros.concurrent.CircularBlockingDeque;
24-
import org.ros.exception.RosRuntimeException;
2515
import org.ros.internal.message.MessageBufferPool;
2616
import org.ros.internal.message.MessageBuffers;
2717
import org.ros.internal.system.Utility;
2818
import org.ros.internal.transport.ChannelHandlerContext;
2919

30-
31-
3220
/**
21+
* The outgoing message queue of a publisher processing type T messages.
22+
* A writer will be spun up in the executor that processes a deque with message entries,
23+
* serializing them to a bytebuffer for outbound asynch channel transport.
24+
* Each publisher created generates this writer which addresses all channels in the context array
25+
* it was created with.
26+
* @author jg
3327
*/
3428
public class OutgoingMessageQueue<T> {
3529

3630
private static final boolean DEBUG = true;
3731
private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);
3832

39-
private static final int DEQUE_CAPACITY = 16;
33+
private static final int DEQUE_CAPACITY = 16384;
4034

4135
private final CircularBlockingDeque<T> deque;
42-
private final AsynchronousChannelGroup channelGroup;
4336
private final Writer writer;
4437
private final MessageBufferPool messageBufferPool;
4538
private final ByteBuffer latchedBuffer;
@@ -49,39 +42,43 @@ public class OutgoingMessageQueue<T> {
4942
private T latchedMessage;
5043

5144
private List<ChannelHandlerContext> channels;
52-
45+
/**
46+
* This class is submitted to the executor to process the deque entries
47+
* and serialize them to the latched buffer for outbound publisher channel
48+
* @author jg
49+
*
50+
*/
5351
private final class Writer extends CancellableLoop {
5452
@Override
5553
public void loop() throws InterruptedException {
5654
T message = deque.takeFirst();
5755
final ByteBuffer buffer = messageBufferPool.acquire();
56+
//messageBufferPool.release(buffer);
57+
latchedBuffer.clear();
5858
Utility.serialize(message, buffer);
5959
if (DEBUG ) {
60-
log.info(String.format("Writing %d bytes.", buffer.limit()));
60+
log.info(String.format("Writing %d bytes.", buffer.position()));
6161
}
62-
// we have to wait until the write
63-
// operation is complete before returning the buffer to the pool.
6462
Iterator<ChannelHandlerContext> it = channels.iterator();
6563
while(it.hasNext()) {
6664
ChannelHandlerContext ctx = it.next();
6765
ctx.write(buffer, new CompletionHandler<Integer, Void>() {
68-
@Override
69-
public void completed(Integer a, Void b) {
70-
messageBufferPool.release(buffer);
71-
}
72-
@Override
73-
public void failed(Throwable arg0, Void arg1) {
74-
log.error("Failed write");
75-
throw new RosRuntimeException(arg0);
76-
}
77-
});
66+
@Override
67+
public void completed(Integer arg0, Void arg1) {
68+
messageBufferPool.release(buffer);
69+
}
70+
71+
@Override
72+
public void failed(Throwable arg0, Void arg1) {
73+
throw new RuntimeException(arg0);
74+
}
75+
});
7876
}
7977
}
8078
}
8179

8280
public OutgoingMessageQueue(ExecutorService executorService, List<ChannelHandlerContext> ctxs) throws IOException {
8381
deque = new CircularBlockingDeque<T>(DEQUE_CAPACITY);
84-
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
8582
writer = new Writer();
8683
messageBufferPool = new MessageBufferPool();
8784
latchedBuffer = MessageBuffers.dynamicBuffer();
@@ -105,7 +102,7 @@ public boolean getLatchMode() {
105102
*/
106103
public void add(T message) {
107104
deque.addLast(message);
108-
setLatchedMessage(message);
105+
//setLatchedMessage(message);
109106
}
110107

111108
private void setLatchedMessage(T message) {
@@ -119,7 +116,6 @@ private void setLatchedMessage(T message) {
119116
*/
120117
public void shutdown() {
121118
writer.cancel();
122-
channelGroup.shutdown();
123119
}
124120

125121

@@ -142,7 +138,4 @@ public int getNumberOfChannels() {
142138
return channels.size();
143139
}
144140

145-
public AsynchronousChannelGroup getChannelGroup() {
146-
return channelGroup;
147-
}
148141
}

src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void run() {
6767
// inject the handlers, start handshake
6868
tcpserver.getFactoryStack().inject(ctx);
6969
ctx.pipeline().fireChannelRegistered();
70-
AsynchTCPWorker uworker = new AsynchTCPWorker(channel.get(), tcpserver);
70+
AsynchTCPWorker uworker = new AsynchTCPWorker(ctx, channel.get());
7171
exc.execute(uworker);
7272
if( DEBUG ) {
7373
log.info("ROS Asynch transport server worker starting");

0 commit comments

Comments
 (0)