Skip to content

Commit 332cb7b

Browse files
committed
Maintenance. Work toward removing transposed dual subscriber bug.
1 parent 2d7ab16 commit 332cb7b

13 files changed

+143
-87
lines changed

src/main/java/org/ros/internal/node/server/SlaveServer.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.ros.internal.node.server;
22

3+
import org.apache.commons.logging.Log;
4+
import org.apache.commons.logging.LogFactory;
35
import org.ros.address.AdvertiseAddress;
46
import org.ros.address.BindAddress;
57
import org.ros.internal.node.client.MasterClient;
@@ -28,10 +30,17 @@
2830
import java.util.concurrent.ScheduledExecutorService;
2931

3032
/**
33+
* SlaveServer is the remote execution endpoint.<br/> This class is reflected for its invokable
34+
* methods and that is made available to remote clients as the callable remote procedures.<br/>
35+
* A remote client can request connections, get collections of publishers and subscriber and information
36+
* about the state of the bus including connections between publishers and subscribers. TcpRosServer is
37+
* the class that does most of the work and is wrapped by this class, which is here to provide the subset of
38+
* remotely invokable methods.
3139
* @author jg
3240
*/
3341
public class SlaveServer extends RpcServer {
34-
42+
private static boolean DEBUG = true;
43+
private static final Log log = LogFactory.getLog(SlaveServer.class);
3544
private final GraphName nodeName;
3645
private final MasterClient masterClient;
3746
private final TopicParticipantManager topicParticipantManager;
@@ -53,10 +62,14 @@ public SlaveServer(GraphName nodeName, BindAddress tcpRosBindAddress,
5362
invokableMethods = new ServerInvokeMethod(this.getClass().getName(), 0);
5463
} catch (ClassNotFoundException e) {
5564
throw new IOException(e);
56-
}
57-
65+
}
5866
this.tcpRosServer =
5967
new TcpRosServer(tcpRosBindAddress, tcpRosAdvertiseAddress, topicParticipantManager, serviceManager, executorService);
68+
if(DEBUG) {
69+
log.info("ADDRESSES SlaveServer ctor:"+nodeName+" TCPBind:"+tcpRosBindAddress+", TCPRosAdv:"+tcpRosAdvertiseAddress+", RPCBind:"+rpcBindAddress+", RPCAdv:"+rpcAdvertiseAddress);
70+
log.info("MANAGERS SlaveServer ctor:"+nodeName+" MasterClient:"+master+" TopicParticipantManager:"+topicParticipantManager+
71+
" ServiceManager:"+serviceManager+" ParameterManager:"+parameterManager+" SchedulaedExecutorService:"+executorService);
72+
}
6073
}
6174

6275
public AdvertiseAddress getTcpRosAdvertiseAddress() {
@@ -89,7 +102,17 @@ public void shutdown() throws IOException {
89102
public List<Object> getBusStats(String callerId) {
90103
throw new UnsupportedOperationException();
91104
}
92-
105+
/**
106+
* We are returning a list of ArrayList of strings. Each ArrayList of string will
107+
* contain 5 elements representing subscriber or publisher bus information.<br/>
108+
* The first element of each list is a monotonically increasing integer.<br/>
109+
* The second element is the node identifier of the opposite party, sub for pub and pub for sub.<br/>
110+
* The third element is o for subscriber, i for publisher<br/>
111+
* The fourth element is the protocol<br/>
112+
* The fifth element is the topic name.<br/>
113+
* @param callerId
114+
* @return
115+
*/
93116
public List<Object> getBusInfo(String callerId) {
94117
List<Object> busInfo = new ArrayList<Object>();
95118
// The connection ID field is opaque to the user. A monotonically increasing
@@ -159,7 +182,12 @@ public Collection<DefaultPublisher<?>> getPublications() {
159182
public int paramUpdate(GraphName parameterName, Object parameterValue) {
160183
return parameterManager.updateParameter(parameterName, parameterValue);
161184
}
162-
185+
/**
186+
*
187+
* @param callerId
188+
* @param topicName
189+
* @param publisherUris
190+
*/
163191
public void publisherUpdate(String callerId, String topicName, Collection<InetSocketAddress> publisherUris) {
164192
GraphName graphName = GraphName.of(topicName);
165193
if (topicParticipantManager.hasSubscriber(graphName)) {
@@ -168,6 +196,11 @@ public void publisherUpdate(String callerId, String topicName, Collection<InetSo
168196
Collection<PublisherIdentifier> identifiers =
169197
PublisherIdentifier.newCollectionFromUris(publisherUris, topicDeclaration);
170198
subscriber.updatePublishers(identifiers);
199+
if(DEBUG) {
200+
log.info("Updating subscriber:"+subscriber);
201+
for(InetSocketAddress i: publisherUris)
202+
log.info("Publisher:"+i);
203+
}
171204
}
172205
}
173206
/**
@@ -183,13 +216,17 @@ public ProtocolDescription requestTopic(String topicName, Collection<String> pro
183216
// TODO(damonkohler): Use NameResolver.
184217
// Canonicalize topic name.
185218
GraphName graphName = GraphName.of(topicName).toGlobal();
219+
if( DEBUG )
220+
log.info("Requesting topic:"+topicName+" for GraphName:"+graphName);
186221
if (!topicParticipantManager.hasPublisher(graphName)) {
187222
//throw new ServerException("No publishers for topic: " + graphName);
188223
return null;
189224
}
190225
for (String protocol : protocols) {
191226
if (protocol.equals(ProtocolNames.TCPROS)) {
192227
try {
228+
if( DEBUG )
229+
log.info("Requested topic:"+topicName+" for GraphName:"+graphName+" returning:"+tcpRosServer.getAdvertiseAddress());
193230
return new TcpRosProtocolDescription(tcpRosServer.getAdvertiseAddress());
194231
} catch (Exception e) {
195232
throw new ServerException(e);

src/main/java/org/ros/internal/node/topic/DefaultPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*/
3838
public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publisher<T> {
3939

40-
private static final boolean DEBUG = false;
40+
private static final boolean DEBUG = true;
4141
private static final Log log = LogFactory.getLog(DefaultPublisher.class);
4242

4343
/**

src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
public class SubscriberHandshake extends BaseClientHandshake {
1818

19-
private static final boolean DEBUG = false;
19+
private static final boolean DEBUG = true;
2020
private static final Log log = LogFactory.getLog(SubscriberHandshake.class);
2121

2222
public SubscriberHandshake(ConnectionHeader outgoingConnectionHeader) {

src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* the {@link Subscriber} may only subscribe to messages of this type
2929
*/
3030
class SubscriberHandshakeHandler<T> extends BaseClientHandshakeHandler {
31-
private static boolean DEBUG = false;
31+
private static boolean DEBUG = true;
3232
private static final Log log = LogFactory.getLog(SubscriberHandshakeHandler.class);
3333

3434
private final IncomingMessageQueue<T> incomingMessageQueue;
@@ -38,15 +38,15 @@ public SubscriberHandshakeHandler(ConnectionHeader outgoingConnectionHeader,
3838
super(new SubscriberHandshake(outgoingConnectionHeader), executorService);
3939
this.incomingMessageQueue = incomingMessageQueue;
4040
if( DEBUG )
41-
log.info("subscriberhandshakeHandler ctor:"+this);
41+
log.info("SubscriberHandshakeHandler ctor:"+this+" IncomingMessageQueue:"+incomingMessageQueue+" outgoingConnectionHeader:"+outgoingConnectionHeader);
4242
}
4343
/**
4444
* Triggered from BaseClientHandshakeHandler channelRead
4545
*/
4646
@Override
4747
protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandlerContext ctx) {
4848
if( DEBUG )
49-
log.info("SubscriberHandshakeHandler.onSuccess:"+ctx+" "+incomingConnectionHeader);
49+
log.info("SubscriberHandshakeHandler onSuccess for:"+this+" ChannalHandlerContext:"+ctx+" incomingConnectionHeader:"+incomingConnectionHeader);
5050
ChannelPipeline pipeline = ctx.pipeline();
5151
pipeline.remove(getName());
5252
NamedChannelHandler namedChannelHandler = incomingMessageQueue.getMessageReceiver();
@@ -62,7 +62,7 @@ protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandl
6262
*/
6363
@Override
6464
protected void onFailure(String errorMessage, ChannelHandlerContext ctx) throws IOException {
65-
log.info("Subscriber handshake failed: " + errorMessage);
65+
log.info("Subscriber handshake failed for:"+this+" ChannelHandlerContext:"+ctx+" with error:" + errorMessage);
6666
ctx.setReady(false);
6767
ctx.close();
6868
}
@@ -73,32 +73,30 @@ public String getName() {
7373
}
7474

7575

76-
public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1) throws Exception {
77-
onFailure(arg1.getMessage(), arg0);
76+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable arg1) throws Exception {
77+
onFailure(arg1.getMessage(), ctx);
7878
if( DEBUG )
79-
log.info("SubscriberHandshakeHandler.exception caught:"+arg0+" "+arg1);
79+
log.info("SubscriberHandshakeHandler exceptionCaught for:"+this+" ChannelHandlerContext:"+ctx+" error:"+arg1);
8080
}
8181

8282
@Override
83-
public void handlerAdded(ChannelHandlerContext arg0) throws Exception {
83+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
8484
if( DEBUG )
85-
log.info("SubscriberHandshakeHandler.handlerAdded:"+arg0);
85+
log.info("SubscriberHandshakeHandler handlerAdded for:"+this+" ChannelHandlerContext:"+ctx);
8686
}
8787

8888
@Override
89-
public void handlerRemoved(ChannelHandlerContext arg0) throws Exception {
89+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
9090
if( DEBUG )
91-
log.info("SubscriberHandshakeHandler.handlerRemoved:"+arg0);
91+
log.info("SubscriberHandshakeHandler handlerRemoved for:"+this+" ChannelHandlerContext:"+ctx);
9292

9393
}
9494

95-
@Override
96-
public void userEventTriggered(ChannelHandlerContext ctx, Object event)
97-
throws Exception {
95+
@Override
96+
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
9897
if( DEBUG )
99-
log.info("SubscriberHandshakeHandler.userEventTriggered:"+ctx+" "+event);
100-
101-
}
98+
log.info("SubscriberHandshakeHandler userEventTriggered for:"+this+" ChannelHandlerContext"+ctx+" "+event);
99+
}
102100

103101

104102
}

src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.topic;
182

193
import org.apache.commons.logging.Log;
@@ -34,9 +18,10 @@
3418
*
3519
* @author kwc@willowgarage.com (Ken Conley)
3620
* @author damonkohler@google.com (Damon Kohler)
21+
* @author jg
3722
*/
3823
public class TopicParticipantManager {
39-
24+
private static boolean DEBUG = true;
4025
private static Log log = LogFactory.getLog(TopicParticipantManager.class);
4126
/**
4227
* A mapping from topic name to {@link Subscriber}.
@@ -91,6 +76,8 @@ public DefaultSubscriber<?> getSubscriber(GraphName topicName) {
9176
}
9277

9378
public void addPublisher(DefaultPublisher<?> publisher) {
79+
if(DEBUG)
80+
log.info("Adding publisher:"+publisher+" to "+this);
9481
publishers.put(publisher.getTopicName(), publisher);
9582
if (listener != null) {
9683
listener.onPublisherAdded(publisher);
@@ -105,6 +92,8 @@ public void removePublisher(DefaultPublisher<?> publisher) {
10592
}
10693

10794
public void addSubscriber(DefaultSubscriber<?> subscriber) {
95+
if(DEBUG)
96+
log.info("Adding subscriber:"+subscriber+" to "+this);
10897
subscribers.put(subscriber.getTopicName(), subscriber);
10998
if (listener != null) {
11099
listener.onSubscriberAdded(subscriber);
@@ -119,7 +108,8 @@ public void removeSubscriber(DefaultSubscriber<?> subscriber) {
119108
}
120109

121110
public void addSubscriberConnection(DefaultSubscriber<?> subscriber, PublisherIdentifier publisherIdentifier) {
122-
//log.info("sub:"+subscriber+" pub:"+publisherIdentifier);
111+
if(DEBUG)
112+
log.info("Connecting subscriber:"+subscriber+" to publisher Identifier:"+publisherIdentifier+" for "+this);
123113
List<PublisherIdentifier> pubs = subscriberConnections.get(subscriber);
124114
//for(PublisherIdentifier p: pubs) log.info("Pub: "+p);
125115
if( pubs == null ) {
@@ -140,6 +130,8 @@ public void removeSubscriberConnection(DefaultSubscriber<?> subscriber, Publishe
140130
}
141131

142132
public void addPublisherConnection(DefaultPublisher<?> publisher, SubscriberIdentifier subscriberIdentifier) {
133+
if(DEBUG)
134+
log.info("Connecting publisher:"+publisher+" to subscriber Identifier:"+subscriberIdentifier+" for "+this);
143135
List<SubscriberIdentifier> subs = publisherConnections.get(publisher);
144136
if( subs == null) {
145137
subs = new ArrayList<SubscriberIdentifier>();

src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* @author jg
2121
*/
2222
class UpdatePublisherRunnable<MessageType> implements Runnable {
23-
23+
private static boolean DEBUG = false;
2424
private static final Log log = LogFactory.getLog(UpdatePublisherRunnable.class);
2525

2626
private final DefaultSubscriber<MessageType> subscriber;
@@ -46,10 +46,13 @@ public UpdatePublisherRunnable(DefaultSubscriber<MessageType> subscriber,
4646
public void run() {
4747
SlaveClient slaveClient;
4848
try {
49-
log.info("Attempting to create SlaveClient:"+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
49+
if(DEBUG)
50+
log.info("Attempting to create SlaveClient:"+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
5051
slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri());
51-
log.info("Slave client created "+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
52-
log.info("Requesting topic name "+subscriber.getTopicName());
52+
if(DEBUG) {
53+
log.info("SlaveClient created "+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
54+
log.info("Requesting topic name "+subscriber.getTopicName());
55+
}
5356
Response<ProtocolDescription> response =
5457
slaveClient.requestTopic(subscriber.getTopicName(), ProtocolNames.SUPPORTED);
5558
// If null there is no publisher for the requested topic

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*
2424
*/
2525
public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {
26-
protected static boolean DEBUG = false;
26+
protected static boolean DEBUG = true;
2727
private static final Log log = LogFactory.getLog(BaseClientHandshakeHandler.class);
2828
private final ClientHandshake clientHandshake;
2929
private final ListenerGroup<ClientHandshakeListener> clientHandshakeListeners;
@@ -46,13 +46,13 @@ public void addListener(ClientHandshakeListener clientHandshakeListener) {
4646
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
4747
//final ByteBuffer bb = MessageBuffers.dynamicBuffer();
4848
if( DEBUG ) {
49-
log.info("Preparing OutgoingConnectionHeader:"+clientHandshake.getOutgoingConnectionHeader());
49+
log.info("BaseClientHandshakeHandler channelActive for ChannelHandlerContext:"+ctx+" for:"+this+" Preparing OutgoingConnectionHeader:"+clientHandshake.getOutgoingConnectionHeader());
5050
}
5151

5252
ctx.write(clientHandshake.getOutgoingConnectionHeader());
5353

5454
if( DEBUG )
55-
log.info("BaseClientHandshakeHandler channelActive "+ctx+" OutgoingConnectionHeader reply to master complete");
55+
log.info("BaseClientHandshakeHandler channelActive for ChannelHandlerContext "+ctx+" OutgoingConnectionHeader reply to master complete for:"+this);
5656
/*
5757
Utility.serialize(clientHandshake.getOutgoingConnectionHeader(), bb);
5858
@@ -73,7 +73,7 @@ public void failed(Throwable arg0, Void arg1) {
7373

7474
@Override
7575
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
76-
log.info("Channel inactive"+ctx);
76+
log.info("Channel inactive ChannelHandlerContext:"+ctx+" for:"+this);
7777
}
7878

7979
@Override
@@ -92,7 +92,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object buff) throws Excepti
9292
@Override
9393
public void channelReadComplete(ChannelHandlerContext arg0) throws Exception {
9494
if( DEBUG )
95-
log.info("SubscriberHandshakeHandler.channelReadComplete:"+arg0);
95+
log.info("SubscriberHandshakeHandler.channelReadComplete:"+arg0+" for:"+this);
9696

9797
}
9898
/**

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
3939
/*Asynchronous*/Socket/*Channel*/ channel;
4040
ChannelPipeline pipeline;
4141
boolean ready = false;
42-
Object mutex = new Object();
42+
private Object mutex = new Object();
4343
Set<String> outboundMessageTypes;
4444
InputStream is = null;
4545
OutputStream os = null;
@@ -188,13 +188,15 @@ public boolean isReady() {
188188
* Object to synchronize read and write completion for the channel in this context, since we will have
189189
* multiple outbound writers accessing the same channel
190190
*/
191+
@Override
191192
public Object getChannelCompletionMutex() { return mutex; }
192193

193194
/**
194195
* Get the type of messages we want to send to the attached subscriber, based on the handshakes
195196
* received.
196197
* @return The HashSet of message type as String
197198
*/
199+
@Override
198200
public Set<String> getMessageTypes() { return outboundMessageTypes; }
199201

200202

src/main/java/org/ros/internal/transport/ChannelPipelineImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
*/
2121
public class ChannelPipelineImpl implements ChannelPipeline {
2222

23-
LinkedBlockingDeque<Entry<String, ChannelHandler>> queue = new LinkedBlockingDeque<Entry<String, ChannelHandler>>();
23+
private LinkedBlockingDeque<Entry<String, ChannelHandler>> queue = new LinkedBlockingDeque<Entry<String, ChannelHandler>>();
2424
private ChannelHandlerContext ctx;
2525

2626
public ChannelPipelineImpl(ChannelHandlerContext ctx) {
2727
this.ctx = ctx;
2828
}
2929
//public ChannelPipelineImpl() {}
3030

31-
public void setContext(ChannelHandlerContext ctx) { this.ctx = ctx; }
31+
//public void setContext(ChannelHandlerContext ctx) { this.ctx = ctx; }
3232

3333

3434
@Override

0 commit comments

Comments
 (0)