Skip to content

Commit 2d7ab16

Browse files
committed
Fixed double subscriber ChannelHandlerContext bug. Tightened up and dekrufted code.
1 parent a03aaa3 commit 2d7ab16

22 files changed

+154
-96
lines changed

src/main/java/org/ros/concurrent/EventDispatcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
* the listener type
2525
*/
2626
public class EventDispatcher<T> extends CancellableLoop {
27-
2827
private final T listener;
2928
private final CircularBlockingDeque<SignalRunnable<T>> events;
3029

@@ -35,11 +34,14 @@ public EventDispatcher(T listener, int queueCapacity) {
3534

3635
public void signal(final SignalRunnable<T> signalRunnable) {
3736
events.addLast(signalRunnable);
37+
//System.out.println("Event length addLast="+events.length()+" ***"+Thread.currentThread().getName());
3838
}
3939

4040
@Override
4141
public void loop() throws InterruptedException {
4242
SignalRunnable<T> signalRunnable = events.takeFirst();
43+
//System.out.println("Event length loop preRun="+events.length()+" ***"+Thread.currentThread().getName());
4344
signalRunnable.run(listener);
45+
//System.out.println("Event length loop postRun="+events.length()+" ***"+Thread.currentThread().getName());
4446
}
4547
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.ros.exception;
2+
3+
import org.ros.internal.node.response.StatusCode;
4+
5+
/**
6+
* Remote exception indication a remote resource, such as a publisher, was not found and we want
7+
* to toss a checked exception to handle that.
8+
* @author jg Copyright (C) NeoCoreTechs 2018
9+
*/
10+
public class RemoteNotFoundException extends RosException {
11+
private static final long serialVersionUID = 2514173639723076472L;
12+
13+
public RemoteNotFoundException(final Throwable throwable) {
14+
super(throwable);
15+
}
16+
17+
public RemoteNotFoundException(final String message, final Throwable throwable) {
18+
super(message, throwable);
19+
}
20+
21+
public RemoteNotFoundException(final String message) {
22+
super(message);
23+
}
24+
25+
public RemoteNotFoundException(StatusCode statusCode, String message) {
26+
super(statusCode.toString()+" "+message);
27+
}
28+
}

src/main/java/org/ros/internal/node/topic/DefaultPublisher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDec
6565
this.messageFactory = messageFactory;
6666
this.subscribers = arrayBlockingQueue;
6767
outgoingMessageQueue = new OutgoingMessageQueue<T>(executorService, arrayBlockingQueue);
68+
if(DEBUG)
69+
log.info("DefaultPublisher contructed with "+outgoingMessageQueue.getNumberOfChannels()+" channels.");
6870
listeners = new ListenerGroup<PublisherListener<T>>(executorService);
6971
listeners.add(new DefaultPublisherListener<T>() {
7072
@Override
@@ -154,7 +156,7 @@ public ConnectionHeader finishHandshake(ConnectionHeader incomingHeader) {
154156
if (DEBUG) {
155157
//log.info("Subscriber handshake header: " + incomingHeader);
156158
//log.info("Publisher handshake header: " + topicDefinitionHeader);
157-
log.info("%%%%%%%%%%%%%%%% Handshake Complete %%%%%%%%%%%%%%%");
159+
log.info("%%%%%%%%%%%%%%%% Finishing Handshake with "+outgoingMessageQueue.getNumberOfChannels()+" channels. %%%%%%%%%%%%%%%");
158160
}
159161
// TODO(damonkohler): Return errors to the subscriber over the wire.
160162
String incomingType = incomingHeader.getField(ConnectionHeaderFields.TYPE);
@@ -195,11 +197,14 @@ public ConnectionHeader finishHandshake(ConnectionHeader incomingHeader) {
195197
*/
196198
public void addSubscriber(SubscriberIdentifier subscriberIdentifer, ChannelHandlerContext ctx) {
197199
if (DEBUG) {
198-
log.info(String.format("Adding subscriber %s channel %s to publisher %s.",
200+
log.info(String.format("Adding subscriber %s ChannelHandlerContext %s to publisher %s.",
199201
subscriberIdentifer, ctx, this));
200202
}
201203
//outgoingMessageQueue.addChannel(ctx);
202204
subscribers.add(ctx);
205+
if (DEBUG) {
206+
log.info("Current number of subscribers:"+subscribers.size());
207+
}
203208
signalOnNewSubscriber(subscriberIdentifer);
204209
}
205210

src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* Handshake logic from the subscriber side of a topic connection.
1111
* The publisher receives the request for fields associated with the topic.
1212
* The subscriber receives the return from the publisher with the available fields and the
13-
* MD5 checksum to verifiy the message and complete the handshake.
13+
* MD5 checksum to verify the message and complete the handshake.
1414
*
1515
* @author jg
1616
*/

src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* the {@link Subscriber} may only subscribe to messages of this type
2929
*/
3030
class SubscriberHandshakeHandler<T> extends BaseClientHandshakeHandler {
31-
private static boolean DEBUG = true;
31+
private static boolean DEBUG = false;
3232
private static final Log log = LogFactory.getLog(SubscriberHandshakeHandler.class);
3333

3434
private final IncomingMessageQueue<T> incomingMessageQueue;

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*
2424
*/
2525
public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {
26-
protected static boolean DEBUG = true;
26+
protected static boolean DEBUG = false;
2727
private static final Log log = LogFactory.getLog(BaseClientHandshakeHandler.class);
2828
private final ClientHandshake clientHandshake;
2929
private final ListenerGroup<ClientHandshakeListener> clientHandshakeListeners;

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
3636
private static final boolean DEBUG = false;
3737
private static final Log log = LogFactory.getLog(ChannelHandlerContextImpl.class);
3838
/*Asynchronous*/ChannelGroup channelGroup;
39-
Executor executor;
4039
/*Asynchronous*/Socket/*Channel*/ channel;
4140
ChannelPipeline pipeline;
4241
boolean ready = false;
@@ -45,10 +44,9 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
4544
InputStream is = null;
4645
OutputStream os = null;
4746

48-
public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2, Executor exc) {
47+
public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2) {
4948
channelGroup = channelGroup2;
5049
channel = channel2;
51-
executor = exc;
5250
pipeline = new ChannelPipelineImpl(this);
5351
outboundMessageTypes = (Set<String>) new HashSet<String>();
5452
}
@@ -59,7 +57,7 @@ public void setChannel(/*Asynchronous*/Socket/*Channel*/ sock) {
5957

6058
@Override
6159
public Executor executor() {
62-
return executor;
60+
return channelGroup.getExecutorService();
6361
}
6462

6563
@Override
@@ -172,7 +170,7 @@ public Socket channel() {
172170

173171
@Override
174172
public String toString() {
175-
return new String("ChannelHandlerContext:"+channel+" "+channelGroup+" "+executor+" "+pipeline+" ready:"+ready);
173+
return new String("ChannelHandlerContext:"+channel+" ChannelGroup:"+channelGroup+" ChannelPipeline:"+pipeline+" ready:"+ready);
176174
}
177175

178176
@Override

src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,11 @@ public void addListener(MessageListener<T> messageListener, int limit) {
7070
* @return the newly allocated {@link SignalRunnable}
7171
*/
7272
private SignalRunnable<MessageListener<T>> newSignalRunnable(final T lazyMessage) {
73+
//System.out.println("newSignalrunnable:"+lazyMessage+" ***"+Thread.currentThread().getName());
7374
return new SignalRunnable<MessageListener<T>>() {
7475
@Override
7576
public void run(MessageListener<T> messageListener) {
77+
//System.out.println("Run Signalrunnable:"+lazyMessage+" ***"+Thread.currentThread().getName());
7678
messageListener.onNewMessage(lazyMessage);
7779
}
7880
};

src/main/java/org/ros/internal/transport/queue/MessageReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public String getName() {
3535
@Override
3636
public Object channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
3737
if (DEBUG) {
38-
log.info(String.format("Received message:"+msg));
38+
log.info(String.format("Received message:"+msg+" ***"+Thread.currentThread().getName()));
3939
}
4040
lazyMessages.addLast((T) msg);
4141
return msg;

src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class OutgoingMessageQueue<T> {
3333
private static final boolean DEBUG = false;
3434
private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);
3535

36-
private static final int DEQUE_CAPACITY = 256;
36+
private static final int DEQUE_CAPACITY = 16;
3737

3838
private final CircularBlockingDeque<T> deque;
3939
private final Writer writer;
@@ -60,6 +60,8 @@ public void loop() throws InterruptedException {
6060
// log.info(String.format("Writing %d bytes.", buffer.position()));
6161
//}
6262
final Iterator<ChannelHandlerContext> it = channels.iterator();
63+
if(DEBUG)
64+
log.info("Messaging "+channels.size()+" channels.");
6365
while(it.hasNext()) {
6466
final ChannelHandlerContext ctx = it.next();
6567
//final CountDownLatch cdl = new CountDownLatch(1);
@@ -70,7 +72,7 @@ public void loop() throws InterruptedException {
7072
if( ctx.isReady() && sendMessage ) {
7173
try {
7274
if( DEBUG )
73-
log.info("Outgoing queue writing:"+message+" to "+ctx);
75+
log.info("Outgoing queue size="+deque.length()+" writing:"+message+" to "+ctx+" from "+Thread.currentThread().getName());
7476
ctx.write(message);
7577
} catch (IOException e) {
7678
log.info("Closing failed write context:"+ctx+" due to "+e);

0 commit comments

Comments
 (0)