Skip to content

Commit 9c92ad5

Browse files
committed
Fixed node disconnect/reconnect.
1 parent 52c5db3 commit 9c92ad5

File tree

8 files changed

+95
-102
lines changed

8 files changed

+95
-102
lines changed

src/main/java/org/ros/internal/node/client/SlaveClient.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.client;
182

193
import org.ros.internal.node.response.IntegerResultFactory;
@@ -34,10 +18,12 @@
3418

3519
import java.util.Collection;
3620
import java.util.List;
37-
import java.util.Map;
21+
3822

3923
/**
40-
* @author damonkohler@google.com (Damon Kohler)
24+
* Create a client to the RpcEndpoint via SlaveRpcEndpointImpl
25+
* from the socket address passed in constructor.
26+
* @author jg
4127
*/
4228
public class SlaveClient extends Client<SlaveRpcEndpoint> {
4329

src/main/java/org/ros/internal/node/server/RpcServer.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@
88
import org.ros.internal.system.Process;
99

1010
import java.io.IOException;
11-
import java.net.InetAddress;
1211
import java.net.InetSocketAddress;
1312
import java.net.URI;
14-
import java.util.concurrent.Callable;
1513
import java.util.concurrent.CountDownLatch;
1614
import java.util.concurrent.TimeUnit;
1715

@@ -33,18 +31,16 @@ public abstract class RpcServer {
3331

3432
public RpcServer(BindAddress bindAddress, AdvertiseAddress advertiseAddress) throws IOException {
3533
this.advertiseAddress = advertiseAddress;
36-
/*
37-
this.advertiseAddress.setPortCallable(new Callable<Integer>() {
38-
@Override
39-
public Integer call() throws Exception {
40-
return server.getPort();
41-
}
42-
});
43-
*/
4434
}
4535

46-
36+
/**
37+
* Invoke a method via remote call.
38+
* @param rri The RemoteRequestInterface passed from remote client
39+
* @return The Object result of invocation
40+
* @throws Exception
41+
*/
4742
public abstract Object invokeMethod(RemoteRequestInterface rri) throws Exception;
43+
4844
/**
4945
* Start up the remote calling server.
5046
*

src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,16 @@
1-
/*
2-
* Copyright (C) 2012 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.server.master;
182

193
import java.net.InetSocketAddress;
204
import java.util.Collection;
215
import java.util.Collections;
226
import java.util.HashMap;
237
import java.util.Map;
8+
249
import java.util.concurrent.ConcurrentHashMap;
2510

2611
import org.apache.commons.logging.Log;
2712
import org.apache.commons.logging.LogFactory;
13+
2814
import org.ros.internal.node.service.ServiceIdentifier;
2915
import org.ros.master.client.TopicSystemState;
3016
import org.ros.namespace.GraphName;
@@ -37,10 +23,10 @@
3723
* <p>
3824
* This class is not thread-safe.
3925
*
40-
* @author khughes@google.com (Keith M. Hughes)
26+
* @author jg
4127
*/
4228
public class MasterRegistrationManagerImpl {
43-
29+
private static boolean DEBUG = true;
4430
private static final Log log = LogFactory.getLog(MasterRegistrationManagerImpl.class);
4531

4632
/**
@@ -65,6 +51,7 @@ public class MasterRegistrationManagerImpl {
6551
*/
6652
private final MasterRegistrationListener listener;
6753

54+
6855
public MasterRegistrationManagerImpl(MasterRegistrationListener listener) {
6956
this.listener = listener;
7057
nodes = new HashMap<GraphName, NodeRegistrationInfo>();
@@ -425,21 +412,27 @@ private NodeRegistrationInfo obtainNodeRegistrationInfo(GraphName nodeName, Inet
425412
}
426413
log.info("Replacing node "+node.getNodeSlaveUri()+" with new requested "+nodeSlaveUri);
427414
// The node is switching slave URIs, so we need a new one.
428-
potentiallyDeleteNode(node);
415+
//potentiallyDeleteNode(node);
416+
nodes.remove(nodeName);
429417
cleanupNode(node);
418+
NodeRegistrationInfo newNode = new NodeRegistrationInfo(nodeName, nodeSlaveUri);
419+
nodes.put(nodeName, newNode);
420+
// Try to reach old node via SlaveClient to shut it down
421+
/*
430422
try {
431423
listener.onNodeReplacement(node);
432424
} catch (Exception e) {
433425
// No matter what, we want to keep going
434426
log.error("Error during onNodeReplacement call", e);
435427
}
428+
*/
429+
return newNode;
430+
} else {
431+
// no existing node
432+
node = new NodeRegistrationInfo(nodeName, nodeSlaveUri);
433+
nodes.put(nodeName, node);
434+
return node;
436435
}
437-
438-
// Either no existing node, or the old node needs to go away
439-
node = new NodeRegistrationInfo(nodeName, nodeSlaveUri);
440-
nodes.put(nodeName, node);
441-
442-
return node;
443436
}
444437

445438
/**
@@ -463,6 +456,39 @@ private void cleanupNode(NodeRegistrationInfo node) {
463456
}
464457
}
465458

459+
/**
460+
* A node is being replaced. Change the NodeRegistrationInfo to the new address
461+
*
462+
* @param node
463+
* the node being replaced
464+
* @param newNode
465+
* the new node
466+
*/
467+
private void replaceNode(NodeRegistrationInfo node, NodeRegistrationInfo newNode) {
468+
boolean found = false;
469+
for (TopicRegistrationInfo topic : node.getPublishers()) {
470+
found = topic.removePublisher(node);
471+
if( found ) {
472+
if( DEBUG )
473+
log.info("Replacing publisher:"+node+" "+newNode+" "+topic);
474+
topic.addPublisher(newNode, topic.getMessageType());
475+
}
476+
}
477+
478+
for (TopicRegistrationInfo topic : node.getSubscribers()) {
479+
found = topic.removeSubscriber(node);
480+
if( found ) {
481+
if( DEBUG )
482+
log.info("Replacing subscriber:"+node+" "+newNode+" "+topic);
483+
topic.addSubscriber(newNode, topic.getMessageType());
484+
}
485+
}
486+
487+
// TODO: service?
488+
for (ServiceRegistrationInfo service : node.getServices()) {
489+
services.remove(service.getServiceName());
490+
}
491+
}
466492
/**
467493
* Remove a node from registration if it no longer has any registrations.
468494
*

src/main/java/org/ros/internal/node/server/master/MasterServer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ protected void contactSubscriberForPublisherUpdate(InetSocketAddress subscriberS
276276
try {
277277
client = new SlaveClient(MASTER_NODE_NAME, subscriberSlaveUri);
278278
} catch (IOException e) {
279-
log.error("MasterServer cannot construct slave cleint to unknown host "+subscriberSlaveUri,e);
279+
log.error("MasterServer cannot construct slave client to unknown host "+subscriberSlaveUri,e);
280280
throw new RosRuntimeException(e);
281281
}
282282
client.publisherUpdate(topicName, publisherUris);
@@ -490,7 +490,10 @@ public List<Object> getPublishedTopics(GraphName caller, GraphName subgraph) {
490490
return result;
491491
}
492492
}
493-
493+
/**
494+
* Create a new SlaveClient with passed NodeRegistrationInfo
495+
* Triggered after shutdown and removal of publishers and subscribers from old slave node
496+
*/
494497
@Override
495498
public void onNodeReplacement(NodeRegistrationInfo nodeInfo) {
496499
// A node in the registration manager is being replaced. Contact the node

src/main/java/org/ros/internal/node/server/master/NodeRegistrationInfo.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,15 @@
1-
/*
2-
* Copyright (C) 2012 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.server.master;
182

193
import org.ros.namespace.GraphName;
204

21-
import java.net.InetAddress;
225
import java.net.InetSocketAddress;
23-
import java.net.URI;
246
import java.util.HashSet;
257
import java.util.Set;
268

279
/**
2810
* Information a master needs about a node.
2911
*
30-
* @author khughes@google.com (Keith M. Hughes)
12+
* @author jg
3113
*/
3214
public class NodeRegistrationInfo {
3315

src/main/java/org/ros/internal/node/server/master/TopicRegistrationInfo.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2012 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.server.master;
182

193

@@ -27,6 +11,8 @@
2711
/**
2812
* All information known to the manager about a topic.
2913
*
14+
* @author jg
15+
*
3016
*/
3117
public class TopicRegistrationInfo {
3218

src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,29 @@ public void run() {
5151
// If we get a read pending exception, try again
5252
final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire();
5353
buf.clear();
54-
final CountDownLatch cdl = new CountDownLatch(1);
54+
//final CountDownLatch cdl = new CountDownLatch(1);
5555
int res = ctx.read(buf);
56-
buf.flip();
57-
Object reso = Utility.deserialize(buf);
58-
if( DEBUG )
59-
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso);
56+
// seems like a -1 is generated when channel breaks, so stop
57+
// this worker on that case
58+
if( res == -1) {
59+
shouldRun = false;
60+
if( DEBUG )
61+
log.info("ROS AsynchTCPWorker CHANNEL BREAK, TERMINATING for "+ctx);
62+
} else {
63+
buf.flip();
64+
Object reso = Utility.deserialize(buf);
65+
if( DEBUG )
66+
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso);
67+
try {
68+
ctx.pipeline().fireChannelRead(reso);
69+
} catch (Exception e) {
70+
if( DEBUG) {
71+
log.info("Exception out of fireChannelRead",e);
72+
e.printStackTrace();
73+
}
74+
ctx.pipeline().fireExceptionCaught(e);
75+
}
76+
}
6077
/*
6178
ctx.read(buf, new CompletionHandler<Integer, Void>() {
6279
@Override
@@ -103,17 +120,13 @@ public void failed(Throwable arg0, Void arg1) {
103120
} // shouldRun
104121

105122
} catch(Exception se) {
106-
if( se instanceof SocketException ) {
107-
log.error("Received SocketException, connection reset..");
108-
} else {
109-
log.error("Remote invocation failure ",se);
110-
}
123+
log.error("AsynchTCPWorker terminating due to ",se);
111124
} finally {
112125
try {
113126
if( DEBUG )
114127
log.info("<<<<<<<<<< Datasocket closing >>>>>>>>");
115-
ctx.close();
116128
ctx.setReady(false);
129+
ctx.close();
117130
} catch (IOException e) {}
118131
}
119132
synchronized(waitHalt) {

src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception
5050
if( DEBUG ) {
5151
log.info("TcpServerHandshakeHandler channelRead:"+e);
5252
}
53+
// check for null, possible fault on bad connect
5354
ConnectionHeader incomingHeader = (ConnectionHeader)e;
5455
if (incomingHeader.hasField(ConnectionHeaderFields.SERVICE)) {
5556
handleServiceHandshake(ctx, incomingHeader);

0 commit comments

Comments
 (0)