Skip to content

Commit af06002

Browse files
committed
Using blocking stream IO instead of NIO channels.
1 parent d5e67bc commit af06002

29 files changed

+223
-429
lines changed

src/main/java/org/ros/internal/node/DefaultNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public void run() {
167167
}
168168
});
169169
}
170-
170+
171171
private void start() {
172172
// The Registrar must be started first so that master registration is
173173
// possible during startup.

src/main/java/org/ros/internal/node/client/RemoteClient.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@
1111
import java.net.SocketException;
1212
import java.util.concurrent.ArrayBlockingQueue;
1313

14-
15-
16-
1714
import org.apache.commons.logging.Log;
1815
import org.apache.commons.logging.LogFactory;
19-
import org.ros.internal.node.response.VoidResultFactory;
2016
import org.ros.internal.node.server.RemoteRequestInterface;
2117
import org.ros.internal.node.server.ThreadPoolManager;
22-
import org.ros.internal.transport.tcp.TcpRosServer;
18+
2319

2420
/**
25-
* This class functions as client to the remote node.
21+
* This class functions as client to the remote node.
2622
* @author jg
2723
* Copyright (C) NeoCoreTechs 2014,2015
2824
*/
@@ -40,21 +36,12 @@ public class RemoteClient implements Runnable {
4036
private boolean shouldRun = true; // master service thread control
4137
private Object waitHalt = new Object();
4238

43-
private VoidResultFactory handleNull = new VoidResultFactory();
44-
4539
ArrayBlockingQueue<RemoteRequestInterface> requests = new ArrayBlockingQueue<RemoteRequestInterface>(1);
4640
ArrayBlockingQueue<Object> responses = new ArrayBlockingQueue<Object>(1);
4741

4842

4943
/**
50-
* Start a relatrix client. Contact the boot time portion of server and queue a CommandPacket to open the desired
51-
* database and get back the master and slave ports of the remote server. The main client thread then
52-
* contacts the server master port, and the remote slave port contacts the master of the client. A WorkerRequestProcessor
53-
* thread is created to handle the processing of payloads and a comm thread handles the bidirectional traffic to server
54-
* @param dbName The name of the remote DB in full qualified path form
55-
* @param remote The remote database name for cluster server
56-
* @param bootNode The name of the remote server host
57-
* @param bootPort Then name of the remote host port on which RelatrixServer is running
44+
* Start a remote client.
5845
*/
5946
public RemoteClient(String bootNode, int bootPort) throws IOException {
6047
if( TEST ) {

src/main/java/org/ros/internal/node/server/SlaveServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Collection;
2626
import java.util.List;
27+
import java.util.concurrent.ArrayBlockingQueue;
2728
import java.util.concurrent.ScheduledExecutorService;
2829

2930
/**
@@ -65,7 +66,7 @@ public AdvertiseAddress getTcpRosAdvertiseAddress() {
6566
* Return the ChannelHandlerContext array of subscribers
6667
* @return
6768
*/
68-
public List<ChannelHandlerContext> getSubscribers() {
69+
public ArrayBlockingQueue<ChannelHandlerContext> getSubscribers() {
6970
return tcpRosServer.getSubscribers();
7071
}
7172
/**

src/main/java/org/ros/internal/node/service/DefaultServiceClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,17 @@ public void shutdown() {
131131

132132
@Override
133133
public void call(T request, ServiceResponseListener<S> listener) {
134-
ByteBuffer buffer = messageBufferPool.acquire();
134+
//ByteBuffer buffer = messageBufferPool.acquire();
135135
// serialize request to buffer
136-
Utility.serialize(request, buffer);
136+
//Utility.serialize(request, buffer);
137137
responseListeners.add(listener);
138-
tcpClient.write(buffer);
139-
messageBufferPool.release(buffer);
138+
try {
139+
tcpClient.getContext().write(request);
140+
} catch (IOException e) {
141+
142+
e.printStackTrace();
143+
}
144+
//messageBufferPool.release(buffer);
140145
}
141146

142147
@Override

src/main/java/org/ros/internal/node/service/ServiceClientHandshakeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public String getName() {
6666
@Override
6767
public void handlerAdded(ChannelHandlerContext arg0) throws Exception {
6868
if( DEBUG )
69-
log.debug("Handler added "+arg0);
69+
log.debug(this+" Handler added "+arg0);
7070

7171
}
7272

src/main/java/org/ros/internal/node/topic/DefaultPublisher.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.nio.ByteBuffer;
2222
import java.nio.channels.Channel;
2323
import java.util.List;
24+
import java.util.concurrent.ArrayBlockingQueue;
2425
import java.util.concurrent.ScheduledExecutorService;
2526
import java.util.concurrent.TimeUnit;
2627

@@ -55,15 +56,15 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
5556
private final NodeIdentifier nodeIdentifier;
5657
private final MessageFactory messageFactory;
5758
// List of channelHandlerContexts that represent subscribers connected through publisher socket
58-
private final List<ChannelHandlerContext> subscribers;
59+
private final ArrayBlockingQueue<ChannelHandlerContext> subscribers;
5960

6061
public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
61-
MessageFactory messageFactory, ScheduledExecutorService executorService, List<ChannelHandlerContext> subscribers) throws IOException {
62+
MessageFactory messageFactory, ScheduledExecutorService executorService, ArrayBlockingQueue<ChannelHandlerContext> arrayBlockingQueue) throws IOException {
6263
super(topicDeclaration);
6364
this.nodeIdentifier = nodeIdentifier;
6465
this.messageFactory = messageFactory;
65-
this.subscribers = subscribers;
66-
outgoingMessageQueue = new OutgoingMessageQueue<T>(executorService, subscribers);
66+
this.subscribers = arrayBlockingQueue;
67+
outgoingMessageQueue = new OutgoingMessageQueue<T>(executorService, arrayBlockingQueue);
6768
listeners = new ListenerGroup<PublisherListener<T>>(executorService);
6869
listeners.add(new DefaultPublisherListener<T>() {
6970
@Override
@@ -147,7 +148,8 @@ public void publish(T message) {
147148
*
148149
* @return encoded connection header from subscriber
149150
*/
150-
public ByteBuffer finishHandshake(ConnectionHeader incomingHeader) {
151+
//public ByteBuffer finishHandshake(ConnectionHeader incomingHeader) {
152+
public ConnectionHeader finishHandshake(ConnectionHeader incomingHeader) {
151153
ConnectionHeader topicDefinitionHeader = getTopicDeclarationHeader();
152154
if (DEBUG) {
153155
//log.info("Subscriber handshake header: " + incomingHeader);
@@ -171,9 +173,16 @@ public ByteBuffer finishHandshake(ConnectionHeader incomingHeader) {
171173
// TODO(damonkohler): Force latch mode to be consistent throughout the life
172174
// of the publisher.
173175
outgoingConnectionHeader.addField(ConnectionHeaderFields.LATCHING, getLatchMode() ? "1" : "0");
176+
return outgoingConnectionHeader;
177+
/*
174178
ByteBuffer buffer = MessageBuffers.dynamicBuffer();
179+
buffer.position(4);
175180
Utility.serialize(outgoingConnectionHeader, buffer);
181+
buffer.rewind();
182+
buffer.putInt(buffer.limit()-4); // put the TLV size
183+
buffer.position(buffer.limit());
176184
return buffer;
185+
*/
177186
}
178187

179188
/**

src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddr
129129
return;
130130
}
131131
try {
132-
133132
tcpClientManager.connect(toString(), address);
134133
} catch (IOException e) {
135134
log.error("Failure attempting to add publisher "+toString()+" "+address);

src/main/java/org/ros/internal/node/topic/PublisherFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import java.io.IOException;
1111
import java.util.List;
12+
import java.util.concurrent.ArrayBlockingQueue;
1213
import java.util.concurrent.ScheduledExecutorService;
1314

1415
/**
@@ -41,21 +42,21 @@ public PublisherFactory(NodeIdentifier nodeIdentifier,
4142
* the message type associated with the {@link Publisher}
4243
* @param topicDeclaration
4344
* {@link TopicDeclaration} that is being published
44-
* @param subscribers
45+
* @param arrayBlockingQueue
4546
* @param messageSerializer
4647
* the {@link MessageSerializer} used for published messages
4748
* @return a new or cached {@link Publisher} instance
4849
* @throws IOException
4950
*/
5051
@SuppressWarnings("unchecked")
51-
public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration, List<ChannelHandlerContext> subscribers) throws IOException {
52+
public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration, ArrayBlockingQueue<ChannelHandlerContext> arrayBlockingQueue) throws IOException {
5253
GraphName topicName = topicDeclaration.getName();
5354
synchronized (mutex) {
5455
if (topicParticipantManager.hasPublisher(topicName)) {
5556
return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName);
5657
} else {
5758
DefaultPublisher<T> publisher =
58-
new DefaultPublisher<T>(nodeIdentifier, topicDeclaration, messageFactory, executorService, subscribers);
59+
new DefaultPublisher<T>(nodeIdentifier, topicDeclaration, messageFactory, executorService, arrayBlockingQueue);
5960
publisher.addListener(new DefaultPublisherListener<T>() {
6061
@Override
6162
public void onNewSubscriber(Publisher<T> publisher, SubscriberIdentifier subscriberIdentifier) {

src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import org.ros.internal.transport.ConnectionHeader;
99
import org.ros.internal.transport.ConnectionHeaderFields;
1010
import org.ros.internal.transport.queue.IncomingMessageQueue;
11-
import org.ros.internal.transport.tcp.AsynchTCPWorker;
12-
import org.ros.internal.transport.tcp.AsynchTempTCPWorker;
1311
import org.ros.internal.transport.tcp.NamedChannelHandler;
1412
import org.ros.node.topic.Publisher;
1513
import org.ros.node.topic.Subscriber;
@@ -30,7 +28,7 @@
3028
* the {@link Subscriber} may only subscribe to messages of this type
3129
*/
3230
class SubscriberHandshakeHandler<T> extends BaseClientHandshakeHandler {
33-
private static boolean DEBUG = false;
31+
private static boolean DEBUG = true;
3432
private static final Log log = LogFactory.getLog(SubscriberHandshakeHandler.class);
3533

3634
private final IncomingMessageQueue<T> incomingMessageQueue;
@@ -57,17 +55,7 @@ protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandl
5755
if (latching != null && latching.equals("1")) {
5856
incomingMessageQueue.setLatchMode(true);
5957
}
60-
// the termination of the AsynchTempTCPWorker after successful handshake will set context ready
61-
// Launch the permanent worker
62-
AsynchTCPWorker uworker = null;
63-
try {
64-
uworker = new AsynchTCPWorker(ctx);
65-
} catch (IOException e) {
66-
log.error("Cannot start worker for context:"+ctx);
67-
e.printStackTrace();
68-
return;
69-
}
70-
executor.execute(uworker);
58+
ctx.setReady(true);
7159
}
7260
/**
7361
* Triggered from BaseClientHandshakeHandler

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
import org.ros.internal.transport.tcp.AbstractNamedChannelHandler;
1010

1111
import java.io.IOException;
12+
import java.io.ObjectOutputStream;
13+
import java.io.OutputStream;
1214
import java.nio.ByteBuffer;
1315
import java.nio.channels.CompletionHandler;
16+
import java.nio.channels.SocketChannel;
1417
import java.util.concurrent.ExecutorService;
1518

1619
/**
@@ -20,7 +23,7 @@
2023
*
2124
*/
2225
public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {
23-
protected static boolean DEBUG = false;
26+
protected static boolean DEBUG = true;
2427
private static final Log log = LogFactory.getLog(BaseClientHandshakeHandler.class);
2528
private final ClientHandshake clientHandshake;
2629
private final ListenerGroup<ClientHandshakeListener> clientHandshakeListeners;
@@ -40,21 +43,32 @@ public void addListener(ClientHandshakeListener clientHandshakeListener) {
4043
}
4144

4245
@Override
43-
public void channelActive(ChannelHandlerContext ctx) throws Exception {
44-
ByteBuffer bb = MessageBuffers.dynamicBuffer();
46+
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
47+
//final ByteBuffer bb = MessageBuffers.dynamicBuffer();
48+
if( DEBUG ) {
49+
log.info("Preparing OutgoingConnectionHeader:"+clientHandshake.getOutgoingConnectionHeader());
50+
}
51+
52+
ctx.write(clientHandshake.getOutgoingConnectionHeader());
53+
54+
if( DEBUG )
55+
log.info("BaseClientHandshakeHandler channelActive "+ctx+" OutgoingConnectionHeader reply to master complete");
56+
/*
4557
Utility.serialize(clientHandshake.getOutgoingConnectionHeader(), bb);
58+
4659
ctx.write(bb, new CompletionHandler<Integer, Void>() {
4760
@Override
4861
public void completed(Integer arg0, Void arg1) {
4962
if( DEBUG )
50-
log.info("BaseClientHandshakeHandler channelActive reply to master complete");
63+
log.info("BaseClientHandshakeHandler channelActive "+ctx+" reply to master complete with "+arg0+" buffer:"+bb);
5164
}
5265
@Override
5366
public void failed(Throwable arg0, Void arg1) {
54-
log.info("BaseClientHandshakeHandler channelActive reply to master failed with:"+arg0);
67+
log.info("BaseClientHandshakeHandler channelActive "+ctx+" reply to master failed with:"+arg0);
5568
}
5669
5770
});
71+
*/
5872
}
5973

6074
@Override

0 commit comments

Comments
 (0)