Skip to content

Commit 4ba23b1

Browse files
committed
Removing Netty, streamline
1 parent 5c6b1a0 commit 4ba23b1

File tree

54 files changed

+1830
-863
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1830
-863
lines changed

src/main/java/build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
<!-- Compiles the java code (including the usage of library for JUnit -->
2828
<target name="compile" depends="makedir">
29-
<javac srcdir="${src.dir}" destdir="${build.dir}" classpath="${lib.dir}/netty.Final.jar:${lib.dir}/commons-pool-1.6.jar:${lib.dir}/commons-lang-2.6.jar:${lib.dir}/ws-commons-util-1.0.1.jar:${lib.dir}/en_us.jar:${lib.dir}/commons-digester-1.8.1.jar:${lib.dir}/commons-logging-1.1.2.jar:${lib.dir}/commons-io-2.4.jar:${lib.dir}/RosBase.jar:${lib.dir}/commons-net-1.4.1.jar:${lib.dir}/RosMsgs.jar">
29+
<javac srcdir="${src.dir}" destdir="${build.dir}" classpath="${lib.dir}/commons-pool-1.6.jar:${lib.dir}/commons-lang-2.6.jar:${lib.dir}/ws-commons-util-1.0.1.jar:${lib.dir}/en_us.jar:${lib.dir}/commons-digester-1.8.1.jar:${lib.dir}/commons-logging-1.1.2.jar:${lib.dir}/commons-io-2.4.jar:${lib.dir}/RosBase.jar:${lib.dir}/commons-net-1.4.1.jar:${lib.dir}/RosMsgs.jar">
3030
</javac>
3131

3232
</target>

src/main/java/org/ros/internal/node/DefaultNode.java

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,5 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node;
182

19-
203
import org.apache.commons.logging.Log;
214
import org.apache.commons.logging.LogFactory;
225
import org.ros.Parameters;
@@ -66,8 +49,6 @@
6649
import org.ros.time.ClockTopicTimeProvider;
6750
import org.ros.time.TimeProvider;
6851

69-
import io.netty.channel.nio.NioEventLoop;
70-
7152
import java.io.IOException;
7253
import java.net.InetSocketAddress;
7354
import java.net.URI;
@@ -79,14 +60,10 @@
7960
/**
8061
* The default implementation of a {@link Node}.
8162
*
82-
* @author ethan.rublee@gmail.com (Ethan Rublee)
83-
* @author kwc@willowgarage.com (Ken Conley)
84-
* @author damonkohler@google.com (Damon Kohler)
8563
*/
8664
public class DefaultNode implements ConnectedNode {
8765
private static final boolean DEBUG = true;
88-
private static final Log log = LogFactory.getLog(TcpRosServer.class);
89-
// TODO(damonkohler): Move to NodeConfiguration.
66+
private static final Log log = LogFactory.getLog(DefaultNode.class);
9067
/**
9168
* The maximum delay before shutdown will begin even if all
9269
* {@link NodeListener}s have not yet returned from their
@@ -97,7 +74,7 @@ public class DefaultNode implements ConnectedNode {
9774

9875
private final NodeConfiguration nodeConfiguration;
9976
private final ListenerGroup<NodeListener> nodeListeners;
100-
private final /*ScheduledExecutorService*/NioEventLoop scheduledExecutorService;
77+
private final ScheduledExecutorService scheduledExecutorService;
10178
private final InetSocketAddress masterUri;
10279
private final MasterClient masterClient;
10380
private final TopicParticipantManager topicParticipantManager;
@@ -126,7 +103,7 @@ public class DefaultNode implements ConnectedNode {
126103
* to this {@link Node} before it starts
127104
*/
128105
public DefaultNode(NodeConfiguration nodeConfiguration, Collection<NodeListener> nodeListeners,
129-
/*ScheduledExecutorService*/NioEventLoop scheduledExecutorService) {
106+
ScheduledExecutorService scheduledExecutorService) {
130107
this.nodeConfiguration = NodeConfiguration.copyOf(nodeConfiguration);
131108
this.nodeListeners = new ListenerGroup<NodeListener>(scheduledExecutorService);
132109
this.nodeListeners.addAll(nodeListeners);
@@ -199,14 +176,18 @@ private void start() {
199176
// During startup, we wait for 1) the RosoutLogger and 2) the TimeProvider.
200177
final CountDownLatch latch = new CountDownLatch(2);
201178

202-
rlog = new RosoutLogger(this);
203-
rlog.getPublisher().addListener(new DefaultPublisherListener<rosgraph_msgs.Log>() {
204-
@Override
205-
public void onMasterRegistrationSuccess(Publisher<rosgraph_msgs.Log> registrant) {
206-
latch.countDown();
207-
}
208-
});
209-
179+
try {
180+
rlog = new RosoutLogger(this);
181+
rlog.getPublisher().addListener(new DefaultPublisherListener<rosgraph_msgs.Log>() {
182+
@Override
183+
public void onMasterRegistrationSuccess(Publisher<rosgraph_msgs.Log> registrant) {
184+
latch.countDown();
185+
}
186+
});
187+
} catch (IOException e1) {
188+
log.error("The Ros Logger has failed to start.",e1);
189+
throw new RuntimeException(e1);
190+
}
210191
boolean useSimTime = false;
211192
try {
212193
useSimTime =
@@ -256,7 +237,11 @@ public <T> Publisher<T> newPublisher(GraphName topicName, String messageType) {
256237
nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
257238
TopicDeclaration topicDeclaration =
258239
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
259-
return publisherFactory.newOrExisting(topicDeclaration);
240+
Publisher<T> publisher = null;
241+
try {
242+
publisher = publisherFactory.newOrExisting(topicDeclaration, slaveServer.getSubscribers());
243+
} catch(IOException e) { throw new RosRuntimeException(e); }
244+
return publisher;
260245
}
261246

262247
@Override
@@ -271,7 +256,10 @@ public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType)
271256
nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
272257
TopicDeclaration topicDeclaration =
273258
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
274-
Subscriber<T> subscriber = subscriberFactory.newOrExisting(topicDeclaration);
259+
Subscriber<T> subscriber = null;
260+
try {
261+
subscriber = subscriberFactory.newOrExisting(topicDeclaration);
262+
} catch(IOException e) { throw new RosRuntimeException(e); }
275263
return subscriber;
276264
}
277265

@@ -328,8 +316,7 @@ public InetSocketAddress lookupServiceUri(String serviceName) {
328316
}
329317

330318
@Override
331-
public <T, S> ServiceClient<T, S> newServiceClient(GraphName serviceName, String serviceType)
332-
throws ServiceNotFoundException {
319+
public <T, S> ServiceClient<T, S> newServiceClient(GraphName serviceName, String serviceType) throws Exception {
333320
GraphName resolvedServiceName = resolveName(serviceName);
334321
InetSocketAddress uri = lookupServiceUri(resolvedServiceName);
335322
if (uri == null) {
@@ -344,8 +331,7 @@ public <T, S> ServiceClient<T, S> newServiceClient(GraphName serviceName, String
344331
}
345332

346333
@Override
347-
public <T, S> ServiceClient<T, S> newServiceClient(String serviceName, String serviceType)
348-
throws ServiceNotFoundException {
334+
public <T, S> ServiceClient<T, S> newServiceClient(String serviceName, String serviceType) throws Exception {
349335
return newServiceClient(GraphName.of(serviceName), serviceType);
350336
}
351337

src/main/java/org/ros/internal/node/RosoutLogger.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.ros.Topics;
2222
import org.ros.node.topic.Publisher;
2323

24+
import java.io.IOException;
2425
import java.io.PrintWriter;
2526
import java.io.StringWriter;
2627

@@ -36,7 +37,7 @@ class RosoutLogger implements Log {
3637
private final Publisher<rosgraph_msgs.Log> publisher;
3738
private final Log log;
3839

39-
public RosoutLogger(DefaultNode defaultNode) {
40+
public RosoutLogger(DefaultNode defaultNode) throws IOException {
4041
this.defaultNode = defaultNode;
4142
publisher = defaultNode.newPublisher(Topics.ROSOUT, rosgraph_msgs.Log._TYPE);
4243
log = LogFactory.getLog(defaultNode.getName().toString());

src/main/java/org/ros/internal/node/server/SlaveServer.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.server;
182

193
import org.ros.address.AdvertiseAddress;
@@ -28,15 +12,13 @@
2812
import org.ros.internal.node.topic.TopicDeclaration;
2913
import org.ros.internal.node.topic.TopicParticipantManager;
3014
import org.ros.internal.system.Process;
15+
import org.ros.internal.transport.ChannelHandlerContext;
3116
import org.ros.internal.transport.ProtocolDescription;
3217
import org.ros.internal.transport.ProtocolNames;
3318
import org.ros.internal.transport.tcp.TcpRosProtocolDescription;
3419
import org.ros.internal.transport.tcp.TcpRosServer;
3520
import org.ros.namespace.GraphName;
3621

37-
import io.netty.channel.EventLoop;
38-
import io.netty.channel.nio.NioEventLoopGroup;
39-
4022
import java.io.IOException;
4123
import java.net.InetSocketAddress;
4224
import java.util.ArrayList;
@@ -45,7 +27,7 @@
4527
import java.util.concurrent.ScheduledExecutorService;
4628

4729
/**
48-
* @author damonkohler@google.com (Damon Kohler)
30+
* @author jg
4931
*/
5032
public class SlaveServer extends RpcServer {
5133

@@ -71,15 +53,21 @@ public SlaveServer(GraphName nodeName, BindAddress tcpRosBindAddress,
7153
} catch (ClassNotFoundException e) {
7254
throw new IOException(e);
7355
}
74-
EventLoop exec = new NioEventLoopGroup().next();
56+
7557
this.tcpRosServer =
76-
new TcpRosServer(tcpRosBindAddress, tcpRosAdvertiseAddress, topicParticipantManager, serviceManager, exec/*utorService*/);
58+
new TcpRosServer(tcpRosBindAddress, tcpRosAdvertiseAddress, topicParticipantManager, serviceManager, executorService);
7759
}
7860

7961
public AdvertiseAddress getTcpRosAdvertiseAddress() {
8062
return tcpRosServer.getAdvertiseAddress();
8163
}
82-
64+
/**
65+
* Return the ChannelHandlerContext array of subscribers
66+
* @return
67+
*/
68+
public List<ChannelHandlerContext> getSubscribers() {
69+
return tcpRosServer.getSubscribers();
70+
}
8371
/**
8472
* Start the RPC server. This start() routine requires that the
8573
* {@link TcpRosServer} is initialized first so that the slave server returns

src/main/java/org/ros/internal/node/service/DefaultServiceClient.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package org.ros.internal.node.service;
33

44

5-
//import org.jboss.netty.buffer.ChannelBuffer;
65
import org.ros.exception.RosRuntimeException;
76
import org.ros.internal.message.MessageBufferPool;
87
import org.ros.internal.system.Utility;
@@ -16,11 +15,9 @@
1615
import org.ros.node.service.ServiceClient;
1716
import org.ros.node.service.ServiceResponseListener;
1817

19-
import io.netty.buffer.ByteBuf;
20-
import io.netty.util.concurrent.EventExecutor;
21-
18+
import java.io.IOException;
2219
import java.net.InetSocketAddress;
23-
import java.net.URI;
20+
import java.nio.ByteBuffer;
2421
import java.util.LinkedList;
2522
import java.util.Queue;
2623
import java.util.concurrent.CountDownLatch;
@@ -84,12 +81,12 @@ public void reset() {
8481
public static <S, T> DefaultServiceClient<S, T> newDefault(GraphName nodeName,
8582
ServiceDeclaration serviceDeclaration,
8683
MessageFactory messageFactory,
87-
ScheduledExecutorService executorService) {
84+
ScheduledExecutorService executorService) throws IOException {
8885
return new DefaultServiceClient<S, T>(nodeName, serviceDeclaration, messageFactory, executorService);
8986
}
9087

9188
private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDeclaration,
92-
MessageFactory messageFactory, ScheduledExecutorService executorService) {
89+
MessageFactory messageFactory, ScheduledExecutorService executorService) throws IOException {
9390
this.serviceDeclaration = serviceDeclaration;
9491
this.messageFactory = messageFactory;
9592
messageBufferPool = new MessageBufferPool();
@@ -99,7 +96,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
9996
// TODO(damonkohler): Support non-persistent connections.
10097
connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1");
10198
connectionHeader.merge(serviceDeclaration.toConnectionHeader());
102-
tcpClientManager = new TcpClientManager((EventExecutor) executorService);
99+
tcpClientManager = new TcpClientManager(executorService);
103100
ServiceClientHandshakeHandler<T, S> serviceClientHandshakeHandler =
104101
new ServiceClientHandshakeHandler<T, S>(connectionHeader, responseListeners, executorService);
105102
handshakeLatch = new HandshakeLatch();
@@ -108,12 +105,16 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
108105
}
109106

110107
@Override
111-
public void connect(InetSocketAddress inetSocketAddress) {
108+
public void connect(InetSocketAddress inetSocketAddress) throws Exception {
112109
assert(inetSocketAddress != null) : "Address must be specified.";
113110
//assert(inetSocketAddress.getScheme().equals("rosrpc")) : "Invalid service URI.";
114111
assert(tcpClient == null) : "Already connected once.";
115112
handshakeLatch.reset();
116-
tcpClient = tcpClientManager.connect(toString(), inetSocketAddress);
113+
try {
114+
tcpClient = tcpClientManager.connect(toString(), inetSocketAddress);
115+
} catch (IOException e1) {
116+
throw new RosRuntimeException("TcpClient failed to connect to remote "+toString()+" "+inetSocketAddress, e1);
117+
}
117118
try {
118119
if (!handshakeLatch.await(1, TimeUnit.SECONDS)) {
119120
throw new RosRuntimeException(handshakeLatch.getErrorMessage());
@@ -131,11 +132,11 @@ public void shutdown() {
131132

132133
@Override
133134
public void call(T request, ServiceResponseListener<S> listener) {
134-
ByteBuf buffer = messageBufferPool.acquire();
135+
ByteBuffer buffer = messageBufferPool.acquire();
135136
// serialize request to buffer
136137
Utility.serialize(request, buffer);
137138
responseListeners.add(listener);
138-
tcpClient.write(buffer).awaitUninterruptibly();
139+
tcpClient.write(buffer);
139140
messageBufferPool.release(buffer);
140141
}
141142

src/main/java/org/ros/internal/node/service/DefaultServiceServer.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.service;
182

193

@@ -26,6 +10,7 @@
2610
import org.ros.concurrent.SignalRunnable;
2711
import org.ros.internal.message.service.ServiceDescription;
2812
import org.ros.internal.node.topic.DefaultPublisher;
13+
import org.ros.internal.transport.ChannelHandler;
2914
import org.ros.internal.transport.ConnectionHeader;
3015
import org.ros.internal.transport.ConnectionHeaderFields;
3116
import org.ros.message.MessageFactory;
@@ -35,11 +20,9 @@
3520
import org.ros.node.service.ServiceServer;
3621
import org.ros.node.service.ServiceServerListener;
3722

38-
import io.netty.buffer.ByteBuf;
39-
import io.netty.channel.ChannelHandler;
40-
4123
import java.net.InetSocketAddress;
4224
import java.net.URI;
25+
import java.nio.ByteBuffer;
4326
import java.util.concurrent.ScheduledExecutorService;
4427

4528
/**
@@ -92,7 +75,7 @@ public void onMasterUnregistrationFailure(ServiceServer<T, S> registrant) {
9275
});
9376
}
9477

95-
public ByteBuf finishHandshake(ConnectionHeader incomingConnectionHeader) {
78+
public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
9679
if (DEBUG) {
9780
log.info("Client handshake header: " + incomingConnectionHeader);
9881
}

0 commit comments

Comments
 (0)