Skip to content

Commit 9b50e2a

Browse files
committed
Multiple handshake bug fix. Completion of original TODO of using TopicParticipantManager in DefaultSubscriber.
1 parent 332cb7b commit 9b50e2a

File tree

8 files changed

+96
-44
lines changed

8 files changed

+96
-44
lines changed

src/main/java/org/ros/address/AdvertiseAddress.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public AdvertiseAddress(String host, final int port) {
6868
}
6969

7070
public AdvertiseAddress(InetAddress host, int port) {
71-
this.host = host.getCanonicalHostName();
71+
this.host = host.getHostName();
7272
this.port = port;
7373
}
7474

src/main/java/org/ros/internal/node/client/Registrar.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,15 @@
2727

2828
/**
2929
* Manages topic, and service registrations of a {@link SlaveServer} with the
30-
* {@link MasterServer}.
30+
* {@link MasterServer}.<br/>
31+
* The primary activity here is to respond to events that occur when a publisher or subscriber is added
32+
* or removed. The fact that it is constructed with a MasterClient and an ExecutorService is a clue.
33+
* For instance, in the onSubscriberAdded event handler method, we contact the remote master to
34+
* register the subscriber. To do this we call updatePublishers on the subscriber
35+
* using the collection of publisherUris returned in the response to masterClient.<br/>
36+
* Passed DefaultSubscriber class creates an UpdatePublisherRunnable thread
37+
* which creates a SlaveClient of type SlaveRpcEndpointImpl to
38+
* contact the publishers. If successful, call signalOnMasterRegistrationSuccess for the subscriber.
3139
*
3240
* @author jg
3341
*/
@@ -170,7 +178,12 @@ public void run() {
170178
});
171179
}
172180
}
173-
181+
/**
182+
* Contact the remote master to register the subscriber. Call updatePublishers on the subscriber
183+
* using the collection of publisherUris returned in the response. Passed DefaultSubscriber class creates an
184+
* UpdatePublisherRunnable thread which creates a SlaveClient of type SlaveRpcEndpointImpl to
185+
* contact the publishers. If successful, signalOnMasterRegistrationSuccess for the subscriber.
186+
*/
174187
@Override
175188
public void onSubscriberAdded(final DefaultSubscriber<?> subscriber) {
176189
if (DEBUG) {

src/main/java/org/ros/internal/node/server/SlaveServer.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
* A remote client can request connections, get collections of publishers and subscriber and information
3636
* about the state of the bus including connections between publishers and subscribers. TcpRosServer is
3737
* the class that does most of the work and is wrapped by this class, which is here to provide the subset of
38-
* remotely invokable methods.
38+
* remotely invokable methods via reflection using the ServerInvokeMethod class.
39+
* @see ServerInvokeMethod
40+
* @see TcpRosServer
3941
* @author jg
4042
*/
4143
public class SlaveServer extends RpcServer {
@@ -111,7 +113,7 @@ public List<Object> getBusStats(String callerId) {
111113
* The fourth element is the protocol<br/>
112114
* The fifth element is the topic name.<br/>
113115
* @param callerId
114-
* @return
116+
* @return A List of Objects representing the above, packaged thusly to enable remote serialization delivery.
115117
*/
116118
public List<Object> getBusInfo(String callerId) {
117119
List<Object> busInfo = new ArrayList<Object>();
@@ -183,10 +185,13 @@ public int paramUpdate(GraphName parameterName, Object parameterValue) {
183185
return parameterManager.updateParameter(parameterName, parameterValue);
184186
}
185187
/**
186-
*
188+
* If there is a subscriber subscribed to this topicName, call updatePublishers on the subscriber
189+
* using the collection of publisherUris. DefaultSubscriber class creates an
190+
* UpdatePublisherRunnable thread which creates a SlaveClient of type SlaveRpcEndpointImpl to
191+
* contact the publisher.
187192
* @param callerId
188193
* @param topicName
189-
* @param publisherUris
194+
* @param publisherUris collection of InetSocketAddress of remote publishers to be updated.
190195
*/
191196
public void publisherUpdate(String callerId, String topicName, Collection<InetSocketAddress> publisherUris) {
192197
GraphName graphName = GraphName.of(topicName);
@@ -199,7 +204,7 @@ public void publisherUpdate(String callerId, String topicName, Collection<InetSo
199204
if(DEBUG) {
200205
log.info("Updating subscriber:"+subscriber);
201206
for(InetSocketAddress i: publisherUris)
202-
log.info("Publisher:"+i);
207+
log.info("PUBLISHER:"+i+" for:"+this);
203208
}
204209
}
205210
}

src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5-
65
import org.ros.concurrent.ListenerGroup;
76
import org.ros.concurrent.SignalRunnable;
87
import org.ros.internal.node.server.NodeIdentifier;
@@ -24,13 +23,18 @@
2423
import java.util.concurrent.TimeUnit;
2524

2625
/**
27-
* Default implementation of a {@link Subscriber}.
26+
* Default implementation of a {@link Subscriber}.<br/>
27+
* Primary players are knownPublishers, which is a Set of PublisherIdentifiers,<br/>
28+
* and TcpClientManager, which has the NamedChannelHandlers.<br/>
29+
* Here, we also maintain the incomingMessageQueue, which contains MessageListeners of the type this
30+
* class is parameterized with.<br/>
31+
*
2832
*
2933
* @author jg
3034
*/
3135
public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {
32-
33-
private static final Log log = LogFactory.getLog(DefaultSubscriber.class);
36+
private static boolean DEBUG = true;
37+
private static final Log log = LogFactory.getLog(DefaultSubscriber.class);
3438

3539
/**
3640
* The maximum delay before shutdown will begin even if all
@@ -43,27 +47,35 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
4347
private final NodeIdentifier nodeIdentifier;
4448
private final ScheduledExecutorService executorService;
4549
private final IncomingMessageQueue<T> incomingMessageQueue;
46-
private final Set<PublisherIdentifier> knownPublishers;
50+
// private final Set<PublisherIdentifier> knownPublishers;
4751
private final TcpClientManager tcpClientManager;
52+
private final TopicParticipantManager topicParticipantManager;
4853
private final Object mutex;
4954

5055
/**
5156
* Manages the {@link SubscriberListener}s for this {@link Subscriber}.
5257
*/
5358
private final ListenerGroup<SubscriberListener<T>> subscriberListeners;
5459

60+
//public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
61+
// TopicDeclaration description, ScheduledExecutorService executorService) throws IOException {
62+
// return new DefaultSubscriber<S>(nodeIdentifier, description, executorService);
63+
//}
5564
public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
56-
TopicDeclaration description, ScheduledExecutorService executorService) throws IOException {
57-
return new DefaultSubscriber<S>(nodeIdentifier, description, executorService);
65+
TopicDeclaration description,
66+
TopicParticipantManager topicParticipantManager,
67+
ScheduledExecutorService executorService) throws IOException {
68+
return new DefaultSubscriber<S>(nodeIdentifier, description, topicParticipantManager, executorService);
5869
}
59-
60-
private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, ScheduledExecutorService executorService) throws IOException {
70+
private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
71+
TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) throws IOException {
6172
super(topicDeclaration);
6273
this.nodeIdentifier = nodeIdentifier;
6374
this.executorService = executorService;
6475
incomingMessageQueue = new IncomingMessageQueue<T>(executorService);
65-
knownPublishers = new HashSet<PublisherIdentifier>();
76+
//knownPublishers = new HashSet<PublisherIdentifier>();
6677
tcpClientManager = TcpClientManager.getInstance(executorService);
78+
this.topicParticipantManager = topicParticipantManager;
6779
mutex = new Object();
6880
SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
6981
new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
@@ -73,22 +85,26 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD
7385
subscriberListeners.add(new DefaultSubscriberListener<T>() {
7486
@Override
7587
public void onMasterRegistrationSuccess(Subscriber<T> registrant) {
76-
log.info("Subscriber registered: " + DefaultSubscriber.this);
88+
if(DEBUG)
89+
log.info("Subscriber registered: " + DefaultSubscriber.this);
7790
}
7891

7992
@Override
8093
public void onMasterRegistrationFailure(Subscriber<T> registrant) {
81-
log.info("Subscriber registration failed: " + DefaultSubscriber.this);
94+
if(DEBUG)
95+
log.info("Subscriber registration failed: " + DefaultSubscriber.this);
8296
}
8397

8498
@Override
8599
public void onMasterUnregistrationSuccess(Subscriber<T> registrant) {
86-
log.info("Subscriber unregistered: " + DefaultSubscriber.this);
100+
if(DEBUG)
101+
log.info("Subscriber unregistered: " + DefaultSubscriber.this);
87102
}
88103

89104
@Override
90105
public void onMasterUnregistrationFailure(Subscriber<T> registrant) {
91-
log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
106+
if(DEBUG)
107+
log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
92108
}
93109
});
94110
}
@@ -119,27 +135,44 @@ public void addMessageListener(MessageListener<T> messageListener, int limit) {
119135
public void addMessageListener(MessageListener<T> messageListener) {
120136
addMessageListener(messageListener, 1);
121137
}
122-
123-
138+
/**
139+
* When the SlaveClient requests a topic from the publisher in UpdatePublisherRunnable, as
140+
* happens when the method updatePublishers is called here, this method is called back on reply from master.
141+
* TcpClientManager calls connect to the passed InetSocketAddress. After that, all the SubscriberListeners are
142+
* signaled with the new Publisher.
143+
* @param publisherIdentifier
144+
* @param address
145+
* @throws Exception
146+
*/
124147
public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) throws Exception {
125148
synchronized (mutex) {
126149
// TODO(damonkohler): If the connection is dropped, knownPublishers should
127150
// be updated.
128-
if (knownPublishers.contains(publisherIdentifier)) {
129-
return;
130-
}
131-
tcpClientManager.connect(toString(), address);
151+
//if (knownPublishers.contains(publisherIdentifier)) {
152+
// return;
153+
//}
154+
Collection<PublisherIdentifier> pubs = topicParticipantManager.getSubscriberConnections(this);
155+
if(pubs != null && pubs.contains(publisherIdentifier)) {
156+
log.info("Defaultsubscriber addPublisher topicParticipantManager CONTAINS "+publisherIdentifier+" at "+address);
157+
} else {
158+
log.info("Defaultsubscriber addPublisher topicParticipantManager DOES NOT CONTAIN "+publisherIdentifier+" at "+address);
159+
topicParticipantManager.addSubscriberConnection(this, publisherIdentifier);
160+
}
161+
tcpClientManager.connect(toString(), address);
132162
// TODO(damonkohler): knownPublishers is duplicate information that is
133163
// already available to the TopicParticipantManager.
134-
knownPublishers.add(publisherIdentifier);
164+
//knownPublishers.add(publisherIdentifier);
135165
signalOnNewPublisher(publisherIdentifier);
136166
}
137167
}
138168

139169
/**
140170
* Updates the list of {@link Publisher}s for the topic that this
141-
* {@link Subscriber} is interested in.
142-
*
171+
* {@link Subscriber} is interested in.<p/>
172+
* Creates UpdatePublisherRunnable of this classes generic type for each PublisherIdentifier.
173+
* Using executorService, spin the runnable which creates SlaveClient of type SlaveRpcEndpoint.
174+
* This is invoked from client Registrar when the onSubscriberAdded event occurs, and
175+
* from the SlaveServer when publisherUpdate is called.<br/>
143176
* @param publisherIdentifiers
144177
* {@link Collection} of {@link PublisherIdentifier}s for the
145178
* subscribed topic
@@ -279,4 +312,5 @@ public void run(SubscriberListener<T> listener) {
279312
public String toString() {
280313
return "Subscriber<" + getTopicDeclaration() + ">";
281314
}
315+
282316
}

src/main/java/org/ros/internal/node/topic/SubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration) throws
6565
return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
6666
} else {
6767
DefaultSubscriber<T> subscriber =
68-
DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, executorService);
68+
DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, topicParticipantManager, executorService);
6969
subscriber.addSubscriberListener(new DefaultSubscriberListener<T>() {
7070
@Override
7171
public void onNewPublisher(Subscriber<T> subscriber, PublisherIdentifier publisherIdentifier) {

src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,22 @@ public class TopicParticipantManager {
3737
* A mapping from {@link Subscriber} to its connected
3838
* {@link PublisherIdentifier}s.
3939
*/
40-
private final HashMap<DefaultSubscriber<?>, List<PublisherIdentifier>> subscriberConnections;
40+
private final Map<DefaultSubscriber<?>, List<PublisherIdentifier>> subscriberConnections;
4141

4242
/**
4343
* A mapping from {@link Publisher} to its connected
4444
* {@link SubscriberIdentifier}s.
4545
*/
46-
private final HashMap<DefaultPublisher<?>,List<SubscriberIdentifier>> publisherConnections;
46+
private final Map<DefaultPublisher<?>,List<SubscriberIdentifier>> publisherConnections;
4747

4848
// TODO(damonkohler): Change to ListenerGroup.
4949
private TopicParticipantManagerListener listener;
5050

5151
public TopicParticipantManager() {
5252
publishers = new ConcurrentHashMap<GraphName, DefaultPublisher<?>>();
5353
subscribers = new ConcurrentHashMap<GraphName, DefaultSubscriber<?>>();
54-
subscriberConnections = new HashMap<DefaultSubscriber<?>, List<PublisherIdentifier>>();
55-
publisherConnections = new HashMap<DefaultPublisher<?>, List<SubscriberIdentifier>>();
54+
subscriberConnections = new ConcurrentHashMap<DefaultSubscriber<?>, List<PublisherIdentifier>>();
55+
publisherConnections = new ConcurrentHashMap<DefaultPublisher<?>, List<SubscriberIdentifier>>();
5656
}
5757

5858
public void setListener(TopicParticipantManagerListener listener) {

src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* @author jg
2121
*/
2222
class UpdatePublisherRunnable<MessageType> implements Runnable {
23-
private static boolean DEBUG = false;
23+
private static boolean DEBUG = true;
2424
private static final Log log = LogFactory.getLog(UpdatePublisherRunnable.class);
2525

2626
private final DefaultSubscriber<MessageType> subscriber;
@@ -48,24 +48,24 @@ public void run() {
4848
try {
4949
if(DEBUG)
5050
log.info("Attempting to create SlaveClient:"+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
51-
slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri());
51+
slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri());
5252
if(DEBUG) {
5353
log.info("SlaveClient created "+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
5454
log.info("Requesting topic name "+subscriber.getTopicName());
5555
}
56-
Response<ProtocolDescription> response =
56+
Response<ProtocolDescription> response =
5757
slaveClient.requestTopic(subscriber.getTopicName(), ProtocolNames.SUPPORTED);
58-
// If null there is no publisher for the requested topic
59-
if( response != null ) {
58+
// If null there is no publisher for the requested topic
59+
if( response != null ) {
6060
ProtocolDescription selected = response.getResult();
6161
if (ProtocolNames.SUPPORTED.contains(selected.getName())) {
6262
subscriber.addPublisher(publisherIdentifier, selected.getAddress());
6363
} else {
6464
log.error("Publisher returned unsupported protocol selection: " + response);
6565
}
66-
} else {
66+
} else {
6767
log.error("There are NO publishers available for topic "+subscriber.getTopicName());
68-
}
68+
}
6969
} catch (Exception e) {
7070
// TODO(damonkohler): Retry logic is needed at the RPC layer.
7171
log.error(e);

src/main/java/org/ros/internal/transport/tcp/TcpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws
9494
AsynchTCPWorker uworker = new AsynchTCPWorker(ctx);
9595
channelGroup.getExecutorService().execute(uworker);
9696
// notify pipeline we connected (or failed via exceptionCaught and runtime exception)
97-
ctx.pipeline().fireChannelActive();
97+
//ctx.pipeline().fireChannelActive();
9898
// recall we keep the list of contexts in TcpClientManager
9999
if (DEBUG) {
100100
log.info("TcpClient Connected with ChannelHandlerContext "+ctx);

0 commit comments

Comments
 (0)