Skip to content

Commit 9ced957

Browse files
committed
Tightened up message reception fault handling, decreased incoming circular queue size. Cleaned up ChannelHandlerContext docs.
1 parent d6ddeda commit 9ced957

File tree

9 files changed

+75
-114
lines changed

9 files changed

+75
-114
lines changed

src/main/java/org/ros/concurrent/CircularBlockingDeque.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ public T peekLast() {
183183
public boolean isEmpty() {
184184
return length == 0;
185185
}
186+
187+
public int length() { return length; }
186188

187189
/**
188190
* Returns an iterator over the queue.

src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5+
56
import org.ros.concurrent.ListenerGroup;
67
import org.ros.concurrent.SignalRunnable;
7-
import org.ros.exception.RosRuntimeException;
88
import org.ros.internal.node.server.NodeIdentifier;
99
import org.ros.internal.transport.ProtocolNames;
1010
import org.ros.internal.transport.queue.IncomingMessageQueue;
@@ -128,12 +128,7 @@ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddr
128128
if (knownPublishers.contains(publisherIdentifier)) {
129129
return;
130130
}
131-
try {
132-
tcpClientManager.connect(toString(), address);
133-
} catch (IOException e) {
134-
log.error("Failure attempting to add publisher "+toString()+" "+address);
135-
throw new RosRuntimeException(e);
136-
}
131+
tcpClientManager.connect(toString(), address);
137132
// TODO(damonkohler): knownPublishers is duplicate information that is
138133
// already available to the TopicParticipantManager.
139134
knownPublishers.add(publisherIdentifier);

src/main/java/org/ros/internal/transport/ChannelHandlerContext.java

Lines changed: 36 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,18 @@
99

1010
import java.util.Set;
1111
import java.util.concurrent.Executor;
12-
import java.util.concurrent.Future;
1312

1413
import org.ros.internal.transport.tcp.ChannelGroup;
1514

15+
/**
16+
* This is the ChannelHandlerContext that links the underlying TCP Socket 'channel' to the {@link ChannelPipeline} and the {@link ChannelGroup}
17+
* and provides access to the {@link Executor} to spin up event dispatcher.
18+
* @author jg (C) NeoCoreTechs 2017
19+
*
20+
*/
1621
public interface ChannelHandlerContext {
17-
/**
18-
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
22+
/**
23+
* Return the {@link Socket} which is bound to the {@link ChannelHandlerContext}.
1924
*/
2025
Socket channel();
2126

@@ -33,99 +38,59 @@ public interface ChannelHandlerContext {
3338
String name();
3439

3540
/**
36-
* Request to bind to the given {@link SocketAddress} and notify the {@link Future} once the operation
37-
* completes, either because the operation was successful or because of an error.
41+
* Request to bind to the given {@link SocketAddress}
3842
* <p>
39-
* This will result in having the
40-
* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, CompletionHandler)} method
41-
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
42-
* {@link Channel}.
43+
* This will result in having the socket bound to the local address.
4344
*/
4445
void bind(SocketAddress localAddress) throws IOException;
4546

4647
/**
47-
* Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
48-
* completes, either because the operation was successful or because of an error.
49-
* <p>
50-
* If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with
51-
* a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}
52-
* will be used.
48+
* Request to connect to the given {@link SocketAddress}.
5349
* <p>
54-
* This will result in having the
55-
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
56-
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
57-
* {@link Channel}.
50+
* If the connection fails because of a connection timeout, the exception will be thrown
51+
* This will result in having the socket connected and the input and output streams initialized.
5852
* @throws IOException
5953
*/
6054
void connect(SocketAddress remoteAddress) throws IOException;
6155

6256
/**
63-
* Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
64-
* {@link Future} once the operation completes, either because the operation was successful or because of
65-
* an error.
66-
* <p>
67-
* This will result in having the
68-
* {@link ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
69-
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
70-
* {@link Channel}.
57+
* Request to connect to the given remote {@link SocketAddress} while bind to the localAddress.
58+
* This will result in having the socket bound and streams ready.
7159
* @throws IOException
7260
*/
7361
void connect(SocketAddress remoteAddress, SocketAddress localAddress) throws IOException;
7462

75-
76-
7763
/**
78-
* Request to disconnect from the remote peer and notify the {@link Future} once the operation completes,
79-
* either because the operation was successful or because of an error.
80-
* <p>
81-
* This will result in having the
82-
* {@link ChannelHandler#disconnect(ChannelHandlerContext, CompletionHandler)}
83-
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
84-
* {@link Channel}.
64+
* Request to disconnect from the remote peer.
65+
* This will result in having the Socket closed.
8566
* @throws IOException
8667
*/
8768
void disconnect() throws IOException;
8869

8970
/**
90-
* Request to close the {@link Channel} and notify the {@link Future} once the operation completes,
91-
* either because the operation was successful or because of
92-
* an error.
93-
*
71+
* Request to close the {@link Channel}.
9472
* After it is closed it is not possible to reuse it again.
95-
* <p>
96-
* This will result in having the
97-
* {@link ChannelHandler#close(ChannelHandlerContext, CompletionHandler)}
98-
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
99-
* {@link Channel}.
73+
* This will result in having the {@link Socket} closed.
10074
* @throws IOException
10175
*/
10276
void close() throws IOException;
10377

104-
10578
/**
106-
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
107-
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was
108-
* read, and triggers a
109-
* {@link ChannelHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the
110-
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
111-
* <p>
112-
* This will result in having the
113-
* {@link ChannelHandler#read(ChannelHandlerContext)}
114-
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
115-
* {@link Channel}.
116-
* @throws IOException
79+
* Request to Read data from the {@link InputStream} into the first inbound buffer.
80+
* It is up to the client, i.e. {@link AsynchTCPWorker}, to trigger a read event if data was
81+
* read, and it does this through the pipeline {@link ChannelPipeline#fireChannelRead(Object)} and
82+
* triggers an event through the pipeline via the {@link ChannelPipeline#fireChannelReadComplete()}
83+
* if successful. If there's a pending read operation already, this method does nothing.
84+
* @throws IOException Generates {@link ChannelPipeline#fireExceptionCaught(Throwable)}
11785
*/
11886
Object read() throws IOException;
11987

12088
/**
12189
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
122-
* This method will not request to actual flush, so be sure to call {@link #flush()}
123-
* once you want to request to flush all pending data to the actual transport.
12490
* @throws IOException
12591
*/
12692
void write(Object msg) throws IOException;
12793

128-
12994
/**
13095
* Return the assigned {@link ChannelPipeline}
13196
*/
@@ -164,10 +129,20 @@ public interface ChannelHandlerContext {
164129
*/
165130
Set<String> getMessageTypes();
166131

132+
/**
133+
* Write with the named {@link CompletionHandler}
134+
* @param msg
135+
* @param handler
136+
*/
167137
void write(Object msg, CompletionHandler<Integer, Void> handler);
168138

139+
/**
140+
* Request to Read data from the {@link InputStream} into the first inbound buffer.
141+
* <p>
142+
* This will result in having the Socket read and {@link CompletionHandler#completed(Object, Object)}
143+
* On IOException {@link CompletionHandler#failed(Throwable, Object)}
144+
*/
169145
Object read(CompletionHandler<Integer, Void> handler);
170146

171147

172-
173148
}

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
3939
boolean ready = false;
4040
Object mutex = new Object();
4141
Set<String> outboundMessageTypes;
42-
43-
42+
InputStream is = null;
43+
OutputStream os = null;
44+
4445
public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2, Executor exc) {
4546
channelGroup = channelGroup2;
4647
channel = channel2;
@@ -73,6 +74,9 @@ public String name() {
7374
@Override
7475
public void connect(SocketAddress remoteAddress) throws IOException {
7576
channel.connect(remoteAddress);
77+
is = channel.getInputStream();
78+
os = channel.getOutputStream();
79+
7680
}
7781

7882
@Override
@@ -95,7 +99,6 @@ public void close() throws IOException {
9599

96100
@Override
97101
public Object read() throws IOException {
98-
InputStream is = channel.getInputStream();
99102
ObjectInputStream ois = new ObjectInputStream(is);
100103
try {
101104
return ois.readObject();
@@ -106,7 +109,6 @@ public Object read() throws IOException {
106109

107110
@Override
108111
public void write(Object msg) throws IOException {
109-
OutputStream os = channel.getOutputStream();
110112
ObjectOutputStream oos = new ObjectOutputStream(os);
111113
oos.writeObject(msg);
112114
oos.flush();

src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
/**
1111
* Created in default subscriber to handle the incoming messages.
1212
* Creates a MessageReceiver and a MessageDispatcher.
13+
* The MessageDispatcher is spun up by the ExecutorService.
1314
* @author jg
1415
*/
1516
public class IncomingMessageQueue<T> {
@@ -18,17 +19,17 @@ public class IncomingMessageQueue<T> {
1819
* The maximum number of incoming messages that will be queued.
1920
* <p>
2021
* This limit applies to dispatching {@link LazyMessage}s as they arrive over
21-
* the network. It is independent of {@link MessageDispatcher} queue
22-
* capacities specified by
22+
* the network. It is independent of {@link MessageDispatcher} queue capacities specified by
2323
* {@link IncomingMessageQueue#addListener(MessageListener, int)} which are
2424
* consumed by user provided {@link MessageListener}s.
25+
* @author Groff (C) NeoCoreTechs 2017
2526
*/
26-
private static final int DEQUE_CAPACITY = 8192;
27+
private static final int DEQUE_CAPACITY = 256;
2728

2829
private final MessageReceiver<T> messageReceiver;
2930
private final MessageDispatcher<T> messageDispatcher;
3031

31-
public IncomingMessageQueue( ExecutorService executorService) {
32+
public IncomingMessageQueue(ExecutorService executorService) {
3233
CircularBlockingDeque<T> lazyMessages =
3334
new CircularBlockingDeque<T>(DEQUE_CAPACITY);
3435
messageReceiver = new MessageReceiver<T>(lazyMessages);

src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
import java.util.concurrent.ExecutorService;
1313

1414
/**
15-
* @author jg
16-
*
17-
* @param <T>
18-
* the message type
15+
* The IncomingMessageQueue creates this and spins it up via the ExecutorService.
16+
* It shares the CircularBlockingDeque with the MessageReceiver.
17+
* It services the MessageListeners with received messages from the queue.
18+
* @param <T> the message type
19+
* @author jg (C) NeoCoreTechs 2017
1920
*/
2021
public class MessageDispatcher<T> extends CancellableLoop {
2122

@@ -34,8 +35,7 @@ public class MessageDispatcher<T> extends CancellableLoop {
3435
private boolean latchMode;
3536
private T latchedMessage;
3637

37-
public MessageDispatcher(CircularBlockingDeque<T> lazyMessages,
38-
ExecutorService executorService) {
38+
public MessageDispatcher(CircularBlockingDeque<T> lazyMessages, ExecutorService executorService) {
3939
this.lazyMessages = lazyMessages;
4040
messageListeners = new ListenerGroup<MessageListener<T>>(executorService);
4141
mutex = new Object();
@@ -55,8 +55,7 @@ public void addListener(MessageListener<T> messageListener, int limit) {
5555
log.info("Adding listener.");
5656
}
5757
synchronized (mutex) {
58-
EventDispatcher<MessageListener<T>> eventDispatcher =
59-
messageListeners.add(messageListener, limit);
58+
EventDispatcher<MessageListener<T>> eventDispatcher = messageListeners.add(messageListener, limit);
6059
if (latchMode && latchedMessage != null) {
6160
eventDispatcher.signal(newSignalRunnable(latchedMessage));
6261
}
@@ -67,8 +66,7 @@ public void addListener(MessageListener<T> messageListener, int limit) {
6766
* Returns a newly allocated {@link SignalRunnable} for the specified
6867
* {@link LazyMessage}.
6968
*
70-
* @param lazyMessage
71-
* the {@link LazyMessage} to signal {@link MessageListener}s with
69+
* @param lazyMessage the {@link LazyMessage} to signal {@link MessageListener}s with
7270
* @return the newly allocated {@link SignalRunnable}
7371
*/
7472
private SignalRunnable<MessageListener<T>> newSignalRunnable(final T lazyMessage) {
@@ -81,9 +79,7 @@ public void run(MessageListener<T> messageListener) {
8179
}
8280

8381
/**
84-
* @param enabled
85-
* {@code true} if latch mode should be enabled, {@code false}
86-
* otherwise
82+
* @param enabled {@code true} if latch mode should be enabled, {@code false} otherwise
8783
*/
8884
public void setLatchMode(boolean enabled) {
8985
latchMode = enabled;

0 commit comments

Comments
 (0)