Skip to content

Commit 302458d

Browse files
committed
RE-architect TcpClientManager, handshake bug FIXED!
1 parent 9b50e2a commit 302458d

File tree

7 files changed

+34
-32
lines changed

7 files changed

+34
-32
lines changed

src/main/java/org/ros/internal/node/service/DefaultServiceClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
9595
// TODO(damonkohler): Support non-persistent connections.
9696
connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1");
9797
connectionHeader.merge(serviceDeclaration.toConnectionHeader());
98-
tcpClientManager = TcpClientManager.getInstance(executorService);
98+
tcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
9999
ServiceClientHandshakeHandler<T, S> serviceClientHandshakeHandler =
100100
new ServiceClientHandshakeHandler<T, S>(connectionHeader, responseListeners, executorService);
101101
handshakeLatch = new HandshakeLatch();

src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD
7474
this.executorService = executorService;
7575
incomingMessageQueue = new IncomingMessageQueue<T>(executorService);
7676
//knownPublishers = new HashSet<PublisherIdentifier>();
77-
tcpClientManager = TcpClientManager.getInstance(executorService);
77+
tcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
7878
this.topicParticipantManager = topicParticipantManager;
7979
mutex = new Object();
8080
SubscriberHandshakeHandler<T> subscriberHandshakeHandler =

src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@
1717
*
1818
*/
1919
public class ChannelInitializerFactoryStack {
20-
private static boolean DEBUG = false;
20+
private static boolean DEBUG = true;
2121
private static final Log log = LogFactory.getLog(ChannelInitializerFactoryStack.class);
2222

2323
private LinkedBlockingDeque<ChannelInitializer> queue = new LinkedBlockingDeque<ChannelInitializer>();
2424

2525
public void addFirst(ChannelInitializer ch) {
2626
if(DEBUG)
27-
log.info("Adding First ChannelInitializer "+ch+" queue size="+queue.size());
27+
log.info("Adding First ChannelInitializer:"+ch+" queue size="+queue.size());
2828
queue.addFirst(ch);
2929
}
3030
public void addLast(ChannelInitializer ch) {
3131
if(DEBUG)
32-
log.info("Adding Last ChannelInitializer "+ch+" queue size="+queue.size());
32+
log.info("Adding Last ChannelInitializer:"+ch+" queue size="+queue.size());
3333
queue.addLast(ch);
3434
}
3535
/**
@@ -40,12 +40,12 @@ public void addLast(ChannelInitializer ch) {
4040
*/
4141
public void inject(ChannelHandlerContext ctx) throws Exception {
4242
if(DEBUG)
43-
log.info("Injecting ChannelHandlerContext "+ctx+" queue size="+queue.size());
43+
log.info("Injecting ChannelHandlerContext:"+ctx+" queue size="+queue.size());
4444
Iterator<ChannelInitializer> it = queue.iterator();
4545
while(it.hasNext()) {
4646
ChannelInitializer ch = it.next();
4747
if(DEBUG)
48-
log.info("ChannelInitializer.initChannel "+ch);
48+
log.info("ChannelInitializer initChannel for ChannelHandlerContext:"+ctx+" ChannelInitializer:"+ch);
4949
ch.initChannel(ctx);
5050
}
5151
}

src/main/java/org/ros/internal/transport/tcp/TcpClient.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,25 @@ public void setKeepAlive(boolean value) throws IOException {
5656
}
5757

5858
public void addNamedChannelHandler(NamedChannelHandler namedChannelHandler) {
59-
namedChannelHandlers.add(namedChannelHandler);
59+
if (DEBUG) {
60+
log.info("TcpClient:"+this+" adding NamedChannelHandler:"+namedChannelHandler);
61+
}
62+
namedChannelHandlers.add(namedChannelHandler);
6063
}
6164

6265
public void addAllNamedChannelHandlers(List<NamedChannelHandler> namedChannelHandlers) {
63-
this.namedChannelHandlers.addAll(namedChannelHandlers);
66+
if (DEBUG) {
67+
for(NamedChannelHandler n: namedChannelHandlers)
68+
log.info("TcpClient:"+this+" will add NamedChannelHandler:"+n);
69+
}
70+
this.namedChannelHandlers.addAll(namedChannelHandlers);
6471
}
6572

6673
public ChannelHandlerContext getContext() { return ctx; }
6774

6875
public Socket connect(String connectionName, SocketAddress socketAddress) throws Exception {
6976
if (DEBUG) {
70-
log.info("TcpClient attempting connection:"+connectionName+" to socket:" + socketAddress);
77+
log.info("TcpClient:"+this+" attempting connection:"+connectionName+" to socket:" + socketAddress);
7178
}
7279
//channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/);
7380
channel = new Socket();
@@ -94,10 +101,10 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws
94101
AsynchTCPWorker uworker = new AsynchTCPWorker(ctx);
95102
channelGroup.getExecutorService().execute(uworker);
96103
// notify pipeline we connected (or failed via exceptionCaught and runtime exception)
97-
//ctx.pipeline().fireChannelActive();
104+
ctx.pipeline().fireChannelActive();
98105
// recall we keep the list of contexts in TcpClientManager
99106
if (DEBUG) {
100-
log.info("TcpClient Connected with ChannelHandlerContext "+ctx);
107+
log.info("TcpClient:"+this+" Connected with ChannelHandlerContext "+ctx);
101108
}
102109
//} else {
103110
// We expect the first connection to succeed. If not, fail fast.

src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,11 @@
66
import java.util.ArrayList;
77
import java.util.Collection;
88
import java.util.List;
9-
import java.util.concurrent.ArrayBlockingQueue;
10-
import java.util.concurrent.ConcurrentHashMap;
11-
import java.util.concurrent.Executor;
129
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.ScheduledExecutorService;
1410

1511
import org.apache.commons.logging.Log;
1612
import org.apache.commons.logging.LogFactory;
1713

18-
1914
/**
2015
* TcpClientManager manages TCP clients which are the subscriber and service clients that communicate with
2116
* remote peers outside master domain.
@@ -32,20 +27,20 @@ public class TcpClientManager {
3227
private final Collection<TcpClient> tcpClients;
3328
private final List<NamedChannelHandler> namedChannelHandlers;
3429

35-
private static ConcurrentHashMap<ExecutorService, TcpClientManager> executors = new ConcurrentHashMap<ExecutorService, TcpClientManager>(1024);
30+
//private static ConcurrentHashMap<ExecutorService, TcpClientManager> executors = new ConcurrentHashMap<ExecutorService, TcpClientManager>(1024);
3631

37-
public static TcpClientManager getInstance(ExecutorService exc) {
38-
synchronized(TcpClientManager.class) {
39-
TcpClientManager tcm = executors.get(exc);
40-
if( tcm == null ) {
41-
tcm = new TcpClientManager(exc);
42-
executors.put(exc, tcm);
43-
}
44-
return tcm;
45-
}
46-
}
32+
//public static TcpClientManager getInstance(ExecutorService exc) {
33+
// synchronized(TcpClientManager.class) {
34+
// TcpClientManager tcm = executors.get(exc);
35+
// if( tcm == null ) {
36+
// tcm = new TcpClientManager(exc);
37+
// executors.put(exc, tcm);
38+
// }
39+
// return tcm;
40+
// }
41+
//}
4742

48-
private TcpClientManager(ExecutorService executor) {
43+
public TcpClientManager(ExecutorService executor) {
4944
this.channelGroup = new ChannelGroupImpl(executor);/*AsynchronousChannelGroup.withThreadPool(executor);*/
5045
this.tcpClients = new ArrayList<TcpClient>();
5146
this.namedChannelHandlers = new ArrayList<NamedChannelHandler>();

src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* @author jg
1313
*/
1414
public class TcpClientPipelineFactory extends ChannelInitializer {
15-
public static boolean DEBUG = false;
15+
public static boolean DEBUG = true;
1616
private static final Log log = LogFactory.getLog(TcpClientPipelineFactory.class);
1717
public static final String LENGTH_FIELD_BASED_FRAME_DECODER = "LengthFieldBasedFrameDecoder";
1818
public static final String LENGTH_FIELD_PREPENDER = "LengthFieldPrepender";

src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,9 @@ public void setup() {
154154
new IncomingMessageQueue<std_msgs.String>(executorService);
155155
secondIncomingMessageQueue =
156156
new IncomingMessageQueue<std_msgs.String>(executorService);
157-
firstTcpClientManager = TcpClientManager.getInstance(executorService);
157+
firstTcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
158158
firstTcpClientManager.addNamedChannelHandler(firstIncomingMessageQueue.getMessageReceiver());
159-
secondTcpClientManager = TcpClientManager.getInstance(executorService);
159+
secondTcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
160160
secondTcpClientManager.addNamedChannelHandler(secondIncomingMessageQueue.getMessageReceiver());
161161
} catch(Exception e) { throw new RosRuntimeException(e); }
162162
}

0 commit comments

Comments
 (0)