Skip to content

Commit 19afda9

Browse files
committed
Netty removal. full fucntionality. Partial message issue.
1 parent 9c9d53e commit 19afda9

23 files changed

+292
-286
lines changed

src/main/java/org/ros/internal/node/service/DefaultServiceServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void onMasterUnregistrationFailure(ServiceServer<T, S> registrant) {
7575
});
7676
}
7777

78-
public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
78+
public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeader) {
7979
if (DEBUG) {
8080
log.info("Client handshake header: " + incomingConnectionHeader);
8181
}
@@ -88,7 +88,7 @@ public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
8888
if (DEBUG) {
8989
log.info("Server handshake header: " + connectionHeader);
9090
}
91-
return connectionHeader.encode();
91+
return connectionHeader;
9292
}
9393

9494
@Override

src/main/java/org/ros/internal/node/topic/DefaultPublisher.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,12 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.topic;
182

193
import org.apache.commons.logging.Log;
204
import org.apache.commons.logging.LogFactory;
215
import org.ros.concurrent.ListenerGroup;
226
import org.ros.concurrent.SignalRunnable;
7+
import org.ros.internal.message.MessageBuffers;
238
import org.ros.internal.node.server.NodeIdentifier;
9+
import org.ros.internal.system.Utility;
2410
import org.ros.internal.transport.ChannelHandlerContext;
2511
import org.ros.internal.transport.ConnectionHeader;
2612
import org.ros.internal.transport.ConnectionHeaderFields;
@@ -40,8 +26,13 @@
4026

4127
/**
4228
* Default implementation of a {@link Publisher}.
29+
* An outgoing message queue is constructed to deliver outbound messages.
30+
* A ListenerGroup of PublisherListeners.
31+
* A MessageFactory
32+
* A list of subscribers as ChannelHandlerContexts.
33+
* A DefaultPublisherListener is constructed as a default entry in the list.
4334
*
44-
* @author damonkohler@google.com (Damon Kohler)
35+
* @author jg
4536
*/
4637
public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publisher<T> {
4738

@@ -180,7 +171,9 @@ public ByteBuffer finishHandshake(ConnectionHeader incomingHeader) {
180171
// TODO(damonkohler): Force latch mode to be consistent throughout the life
181172
// of the publisher.
182173
outgoingConnectionHeader.addField(ConnectionHeaderFields.LATCHING, getLatchMode() ? "1" : "0");
183-
return (ByteBuffer) outgoingConnectionHeader.encode();
174+
ByteBuffer buffer = MessageBuffers.dynamicBuffer();
175+
Utility.serialize(outgoingConnectionHeader, buffer);
176+
return buffer;
184177
}
185178

186179
/**

src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*/
3131
public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {
3232

33-
private static final Log log = LogFactory.getLog(DefaultPublisher.class);
33+
private static final Log log = LogFactory.getLog(DefaultSubscriber.class);
3434

3535
/**
3636
* The maximum delay before shutdown will begin even if all

src/main/java/org/ros/internal/node/topic/DefaultTopicParticipant.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.topic;
182

193
import org.ros.internal.transport.ConnectionHeader;
@@ -23,9 +7,13 @@
237
import java.util.List;
248

259
/**
26-
* Base definition of a {@link TopicSystemState}.
10+
* Abstract class and Base definition of a {@link TopicSystemState}.
11+
* Primarily operates on TopicDeclaration supplied in constructor.
12+
* During handshake, the topic declaration ConnectionHeader and message type are used to set up
13+
* the class of responses to subscriber which is then stored in the ChannelHandlercontext to filter traffic.
14+
* Provides master signaling methods.
2715
*
28-
* @author damonkohler@google.com (Damon Kohler)
16+
* @author jg
2917
*/
3018
public abstract class DefaultTopicParticipant implements TopicParticipant {
3119

src/main/java/org/ros/internal/system/Utility.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,22 @@
55
import java.io.ObjectInputStream;
66
import java.io.ObjectOutputStream;
77
import java.nio.ByteBuffer;
8-
import java.nio.channels.Channels;
9-
import java.nio.channels.ReadableByteChannel;
8+
//import java.nio.channels.Channels;
9+
//import java.nio.channels.ReadableByteChannel;
1010

1111
import org.apache.commons.logging.Log;
1212
import org.apache.commons.logging.LogFactory;
1313
import org.ros.internal.message.field.DirectByteArrayOutputStream;
14-
14+
/**
15+
* Static methods to serialize and deserialize ByteBuffer to/from Object.
16+
* We are using java serialization. Traditional ROS uses XML/RPC so in general
17+
* we are using ROS generated messages, with all the associated ROS fields and formats
18+
* in a Java serialization context. If we need a ROS gateway the bindings should remain straightforward.
19+
* @author jg
20+
*
21+
*/
1522
public class Utility {
16-
private static boolean DEBUG = true;
23+
private static boolean DEBUG = false;
1724
private static final Log log = LogFactory.getLog(Utility.class);
1825
public static <T> void serialize(T value, ByteBuffer buffer) {
1926
//serializer.serialize((Message) value, buffer);
@@ -36,8 +43,6 @@ public static <T> void serialize(T value, ByteBuffer buffer) {
3643
public static Object deserialize(ByteBuffer buffer) {
3744
//return deserializer.deserialize(buffer);
3845
byte[] obuf = buffer.array();
39-
if( DEBUG )
40-
log.info("Deserialize:"+obuf.length);
4146
Object Od = null;
4247
try {
4348
ObjectInputStream s;

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import org.apache.commons.logging.LogFactory;
55
import org.ros.concurrent.ListenerGroup;
66
import org.ros.concurrent.SignalRunnable;
7+
import org.ros.internal.message.MessageBuffers;
8+
import org.ros.internal.system.Utility;
79
import org.ros.internal.transport.tcp.AbstractNamedChannelHandler;
810

911
import java.io.IOException;
1012
import java.nio.ByteBuffer;
13+
import java.nio.channels.CompletionHandler;
1114
import java.util.concurrent.ExecutorService;
1215

1316
/**
@@ -35,7 +38,20 @@ public void addListener(ClientHandshakeListener clientHandshakeListener) {
3538

3639
@Override
3740
public void channelActive(ChannelHandlerContext ctx) throws Exception {
38-
ctx.write(clientHandshake.getOutgoingConnectionHeader().encode());
41+
ByteBuffer bb = MessageBuffers.dynamicBuffer();
42+
Utility.serialize(clientHandshake.getOutgoingConnectionHeader(), bb);
43+
ctx.write(bb, new CompletionHandler<Integer, Void>() {
44+
@Override
45+
public void completed(Integer arg0, Void arg1) {
46+
if( DEBUG )
47+
log.info("BaseClientHandshakeHandler channelActive reply to master complete");
48+
}
49+
@Override
50+
public void failed(Throwable arg0, Void arg1) {
51+
log.info("BaseClientHandshakeHandler channelActive reply to master failed with:"+arg0);
52+
}
53+
54+
});
3955
}
4056

4157
@Override
@@ -45,16 +61,15 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
4561

4662
@Override
4763
public Object channelRead(ChannelHandlerContext ctx, Object buff) throws Exception {
48-
ByteBuffer buffer = (ByteBuffer) buff;
49-
ConnectionHeader connectionHeader = ConnectionHeader.decode(buffer);
64+
ConnectionHeader connectionHeader = (ConnectionHeader)buff;
5065
if (clientHandshake.handshake(connectionHeader)) {
5166
onSuccess(connectionHeader, ctx);
5267
signalOnSuccess(connectionHeader);
5368
} else {
5469
onFailure(clientHandshake.getErrorMessage(), ctx);
5570
signalOnFailure(clientHandshake.getErrorMessage());
5671
}
57-
return buffer;
72+
return buff;
5873
}
5974

6075
@Override

src/main/java/org/ros/internal/transport/ChannelHandlerContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.nio.channels.AsynchronousSocketChannel;
88
import java.nio.channels.Channel;
99
import java.nio.channels.CompletionHandler;
10+
import java.util.Set;
1011
import java.util.concurrent.Executor;
1112
import java.util.concurrent.Future;
1213

@@ -157,8 +158,24 @@ public interface ChannelHandlerContext {
157158
*/
158159
boolean isReady();
159160

161+
/**
162+
* Set this channel and its context ready or not for traffic.
163+
* @param ready
164+
*/
160165
void setReady(boolean ready);
161166

167+
/**
168+
* Get the Object representing a mutex to use for completion of operation if necessary.
169+
* @return
170+
*/
162171
Object getChannelCompletionMutex();
172+
173+
/**
174+
* Each successive handshake completion will add another message type to this synchronized set.
175+
* This set is used to determine whether a message placed on the outbound queue will be sent to the
176+
* channel in the context.
177+
* @return The synchronized hash set of message type strings
178+
*/
179+
Set<String> getMessageTypes();
163180

164181
}

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,24 @@
77
import java.nio.channels.AsynchronousSocketChannel;
88
import java.nio.channels.Channel;
99
import java.nio.channels.CompletionHandler;
10+
import java.util.Collections;
11+
import java.util.HashSet;
12+
import java.util.Set;
1013
import java.util.concurrent.Executor;
1114
import java.util.concurrent.Future;
1215

1316
/**
1417
* A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers.
1518
* There is one channel per context.
16-
* There is one pipeline context.
19+
* There is one pipeline per context.
1720
* There is one executor per group of contexts.
1821
* The pipeline is a stateful collection of handlers that represent the current channel state and means
1922
* of executing functions in the process of connecting, disconnecting, reading, failing etc.
2023
* The pipeline is configured by means of factories that create ChannelInitializers, inserting
2124
* them in order in the pipeline deque.
2225
* The functions of the system move data through the pipeline, triggering the handlers in the sequence they were
2326
* added.
27+
* Traffic is filtered to subscriber channels via the hash set of requested message types
2428
* @author jg
2529
*
2630
*/
@@ -31,13 +35,15 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
3135
ChannelPipeline pipeline;
3236
boolean ready = false;
3337
Object mutex = new Object();
38+
Set<String> outboundMessageTypes;
3439

3540

3641
public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
3742
channelGroup = grp;
3843
channel = ch;
3944
executor = exc;
4045
pipeline = new ChannelPipelineImpl(this);
46+
outboundMessageTypes = (Set<String>) new HashSet<String>();
4147
}
4248

4349
public void setChannel(AsynchronousSocketChannel sock) {
@@ -130,11 +136,24 @@ public boolean isReady() {
130136
return ready;
131137
}
132138

139+
/**
140+
* Sets this context ready or not to receive traffic
141+
*/
142+
@Override
133143
public void setReady(boolean ready) { this.ready = ready;}
134144

135145
/**
136146
* Object to synchronize read and write completion for the channel in this context, since we will have
137147
* multiple outbound writers accessing the same channel
138148
*/
139149
public Object getChannelCompletionMutex() { return mutex; }
150+
151+
/**
152+
* Get the type of messages we want to send to the attached subscriber, based on the handshakes
153+
* received.
154+
* @return The HashSet of message type as String
155+
*/
156+
public Set<String> getMessageTypes() { return outboundMessageTypes; }
157+
158+
140159
}

src/main/java/org/ros/internal/transport/ChannelPipelineImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public ChannelPipeline fireChannelRead(Object msg) throws Exception {
264264
Map.Entry<String,ChannelHandler> me = null;
265265
while(it.hasNext()) {
266266
me = it.next();
267-
msg = me.getValue().channelRead(ctx, msg);
267+
me.getValue().channelRead(ctx, msg);
268268
}
269269
return this;
270270
}

0 commit comments

Comments
 (0)