Skip to content

Commit 9c9d53e

Browse files
committed
Alpha full traffic but unfiltered subscriber mssages
1 parent 2532b89 commit 9c9d53e

17 files changed

+261
-280
lines changed

src/main/java/org/ros/internal/node/topic/DefaultPublisher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ public T newMessage() {
143143

144144
@Override
145145
public void publish(T message) {
146-
if (DEBUG) {
147-
log.info(String.format("Publishing message %s on topic %s.", message, getTopicName()));
148-
}
146+
//if (DEBUG) {
147+
// log.info(String.format("Publishing message %s on topic %s.", message, getTopicName()));
148+
//}
149149
outgoingMessageQueue.add(message);
150150
}
151151

@@ -180,7 +180,7 @@ public ByteBuffer finishHandshake(ConnectionHeader incomingHeader) {
180180
// TODO(damonkohler): Force latch mode to be consistent throughout the life
181181
// of the publisher.
182182
outgoingConnectionHeader.addField(ConnectionHeaderFields.LATCHING, getLatchMode() ? "1" : "0");
183-
return outgoingConnectionHeader.encode();
183+
return (ByteBuffer) outgoingConnectionHeader.encode();
184184
}
185185

186186
/**

src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,3 @@
1-
/*
2-
* Copyright (C) 2011 Google Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
171
package org.ros.internal.node.topic;
182

193
import org.apache.commons.logging.Log;
@@ -24,8 +8,11 @@
248

259
/**
2610
* Handshake logic from the subscriber side of a topic connection.
11+
* The publisher receives the request for fields associated with the topic.
12+
* The subscriber receives the return from the publisher with the available fields and the
13+
* MD5 checksum to verifiy the message and complete the handshake.
2714
*
28-
* @author damonkohler@google.com (Damon Kohler)
15+
* @author jg
2916
*/
3017
public class SubscriberHandshake extends BaseClientHandshake {
3118

src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,12 @@ protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandl
5353
if (latching != null && latching.equals("1")) {
5454
incomingMessageQueue.setLatchMode(true);
5555
}
56+
ctx.setReady(true);
5657
}
5758

5859
@Override
5960
protected void onFailure(String errorMessage, ChannelHandlerContext ctx) throws IOException {
60-
log.error("Subscriber handshake failed: " + errorMessage);
61+
log.info("Subscriber handshake failed: " + errorMessage);
6162
ctx.close();
6263
}
6364

src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void run() {
6666
} catch (Exception e) {
6767
// TODO(damonkohler): Retry logic is needed at the RPC layer.
6868
log.error(e);
69+
e.printStackTrace();
6970
}
7071
}
7172
}

src/main/java/org/ros/internal/system/Utility.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
import org.ros.internal.message.field.DirectByteArrayOutputStream;
1414

1515
public class Utility {
16-
private static final Log log = LogFactory.getLog(Utility.class);
16+
private static boolean DEBUG = true;
17+
private static final Log log = LogFactory.getLog(Utility.class);
1718
public static <T> void serialize(T value, ByteBuffer buffer) {
1819
//serializer.serialize((Message) value, buffer);
1920
DirectByteArrayOutputStream dbaos = new DirectByteArrayOutputStream();
@@ -22,8 +23,10 @@ public static <T> void serialize(T value, ByteBuffer buffer) {
2223
oos = new ObjectOutputStream(dbaos);
2324
oos.writeObject(value);
2425
oos.flush();
26+
buffer.clear();
2527
buffer.put(dbaos.getBuf());
2628
oos.close();
29+
buffer.flip();
2730
} catch (IOException e) {
2831
e.printStackTrace();
2932
}
@@ -33,20 +36,24 @@ public static <T> void serialize(T value, ByteBuffer buffer) {
3336
public static Object deserialize(ByteBuffer buffer) {
3437
//return deserializer.deserialize(buffer);
3538
byte[] obuf = buffer.array();
39+
if( DEBUG )
40+
log.info("Deserialize:"+obuf.length);
3641
Object Od = null;
3742
try {
3843
ObjectInputStream s;
3944
ByteArrayInputStream bais = new ByteArrayInputStream(obuf);
40-
ReadableByteChannel rbc = Channels.newChannel(bais);
41-
s = new ObjectInputStream(Channels.newInputStream(rbc));
45+
//ReadableByteChannel rbc = Channels.newChannel(bais);
46+
s = new ObjectInputStream(bais/*Channels.newInputStream(rbc)*/);
4247
Od = s.readObject();
4348
s.close();
4449
bais.close();
45-
rbc.close();
50+
//rbc.close();
4651
} catch (IOException ioe) {
4752
} catch (ClassNotFoundException cnf) {
4853
log.error("Class cannot be deserialized, may have been modified beyond version compatibility");
4954
}
55+
if( DEBUG )
56+
log.info("Deserialize return:"+Od);
5057
return Od;
5158

5259
}

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package org.ros.internal.transport;
22

3-
//import org.jboss.netty.channel.ChannelHandlerContext;
4-
//import org.jboss.netty.channel.ChannelStateEvent;
5-
//import org.jboss.netty.channel.MessageEvent;
6-
73
import org.apache.commons.logging.Log;
84
import org.apache.commons.logging.LogFactory;
95
import org.ros.concurrent.ListenerGroup;
@@ -29,7 +25,10 @@ public BaseClientHandshakeHandler(ClientHandshake clientHandshake, ExecutorServi
2925
this.clientHandshake = clientHandshake;
3026
clientHandshakeListeners = new ListenerGroup<ClientHandshakeListener>(executorService);
3127
}
32-
28+
/**
29+
* Primarily services?
30+
* @param clientHandshakeListener
31+
*/
3332
public void addListener(ClientHandshakeListener clientHandshakeListener) {
3433
clientHandshakeListeners.add(clientHandshakeListener);
3534
}

src/main/java/org/ros/internal/transport/ChannelHandlerContext.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public interface ChannelHandlerContext {
3535
* <p>
3636
* This will result in having the
3737
* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, CompletionHandler)} method
38-
* called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
38+
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
3939
* {@link Channel}.
4040
*/
4141
AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException;
@@ -50,7 +50,7 @@ public interface ChannelHandlerContext {
5050
* <p>
5151
* This will result in having the
5252
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
53-
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
53+
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
5454
* {@link Channel}.
5555
*/
5656
void connect(SocketAddress remoteAddress);
@@ -61,7 +61,7 @@ public interface ChannelHandlerContext {
6161
* an error.
6262
* <p>
6363
* This will result in having the
64-
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
64+
* {@link ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
6565
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
6666
* {@link Channel}.
6767
* @throws IOException
@@ -75,11 +75,12 @@ public interface ChannelHandlerContext {
7575
* either because the operation was successful or because of an error.
7676
* <p>
7777
* This will result in having the
78-
* {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, CompletionHandler)}
79-
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
78+
* {@link ChannelHandler#disconnect(ChannelHandlerContext, CompletionHandler)}
79+
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
8080
* {@link Channel}.
81+
* @throws IOException
8182
*/
82-
Future<Void> disconnect();
83+
void disconnect() throws IOException;
8384

8485
/**
8586
* Request to close the {@link Channel} and notify the {@link Future} once the operation completes,
@@ -89,8 +90,8 @@ public interface ChannelHandlerContext {
8990
* After it is closed it is not possible to reuse it again.
9091
* <p>
9192
* This will result in having the
92-
* {@link ChannelOutboundHandler#close(ChannelHandlerContext, CompletionHandler)}
93-
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
93+
* {@link ChannelHandler#close(ChannelHandlerContext, CompletionHandler)}
94+
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
9495
* {@link Channel}.
9596
* @throws IOException
9697
*/
@@ -104,11 +105,24 @@ public interface ChannelHandlerContext {
104105
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
105106
* <p>
106107
* This will result in having the
107-
* {@link ChannelOutboundHandler#read(ChannelHandlerContext)}
108-
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
108+
* {@link ChannelHandler#read(ChannelHandlerContext)}
109+
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
109110
* {@link Channel}.
110111
*/
111112
Future<Integer> read(ByteBuffer buf);
113+
/**
114+
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
115+
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was
116+
* read, and triggers a
117+
* {@link ChannelHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the
118+
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
119+
* <p>
120+
* This will result in having the
121+
* {@link ChannelHandler#read(ChannelHandlerContext)}
122+
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
123+
* {@link Channel}.
124+
*/
125+
void read(ByteBuffer buf,CompletionHandler<Integer, Void> completionHandler);
112126

113127
/**
114128
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
@@ -122,22 +136,8 @@ public interface ChannelHandlerContext {
122136
* This method will not request to actual flush, so be sure to call {@link #flush()}
123137
* once you want to request to flush all pending data to the actual transport.
124138
*/
125-
Future<Void> write(Object msg, CompletionHandler<Integer,Void> handler);
126-
127-
/**
128-
* Request to flush all pending messages via this ChannelOutboundInvoker.
129-
*/
130-
ChannelHandlerContext flush();
131-
132-
/**
133-
* Shortcut for call {@link #write(Object, CompletionHandler)} and {@link #flush()}.
134-
*/
135-
Future<Void> writeAndFlush(Object msg, CompletionHandler<Integer,Void> handler);
139+
void write(Object msg, CompletionHandler<Integer,Void> handler);
136140

137-
/**
138-
* Shortcut for call {@link #write(Object)} and {@link #flush()}.
139-
*/
140-
Future<Integer> writeAndFlush(Object msg);
141141

142142
/**
143143
* Return the assigned {@link ChannelPipeline}
@@ -150,7 +150,15 @@ public interface ChannelHandlerContext {
150150
*/
151151
AsynchronousChannelGroup getChannelGroup();
152152

153+
/**
154+
* Determine if this channel is ready for processing, it is configured, has a socket
155+
* and the communication is sound. If the socket breaks this goes false and no writes are
156+
* performed to this channel
157+
*/
158+
boolean isReady();
159+
160+
void setReady(boolean ready);
153161

154-
162+
Object getChannelCompletionMutex();
155163

156164
}

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
2929
Executor executor;
3030
AsynchronousSocketChannel channel;
3131
ChannelPipeline pipeline;
32+
boolean ready = false;
33+
Object mutex = new Object();
3234

3335

3436
public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
@@ -72,8 +74,8 @@ public void connect(SocketAddress remoteAddress, SocketAddress localAddress) thr
7274

7375

7476
@Override
75-
public Future<Void> disconnect() {
76-
throw new RuntimeException("nope..");
77+
public void disconnect() throws IOException {
78+
channel.close();
7779
}
7880

7981
@Override
@@ -93,26 +95,15 @@ public Future<Integer> write(Object msg) {
9395
}
9496

9597
@Override
96-
public Future<Void> write(Object msg,
97-
CompletionHandler<Integer, Void> handler) {
98-
throw new RuntimeException("nope..");
98+
public void write(Object msg, CompletionHandler<Integer, Void> handler) {
99+
channel.write((ByteBuffer)msg, null, handler);
99100
}
100101

101102
@Override
102-
public ChannelHandlerContext flush() {
103-
throw new RuntimeException("nope..");
103+
public void read(ByteBuffer buf, CompletionHandler<Integer, Void> handler) {
104+
channel.read(buf, null, handler);
104105
}
105106

106-
@Override
107-
public Future<Void> writeAndFlush(Object msg,
108-
CompletionHandler<Integer, Void> handler) {
109-
throw new RuntimeException("nope..");
110-
}
111-
112-
@Override
113-
public Future<Integer> writeAndFlush(Object msg) {
114-
return channel.write((ByteBuffer) msg);
115-
}
116107

117108
@Override
118109
public ChannelPipeline pipeline() {
@@ -131,8 +122,19 @@ public Channel channel() {
131122

132123
@Override
133124
public String toString() {
134-
return new String("ChannelHandlerContext:"+channel+" "+channelGroup+" "+executor+" "+pipeline);
125+
return new String("ChannelHandlerContext:"+channel+" "+channelGroup+" "+executor+" "+pipeline+" ready:"+ready);
135126
}
136127

128+
@Override
129+
public boolean isReady() {
130+
return ready;
131+
}
137132

133+
public void setReady(boolean ready) { this.ready = ready;}
134+
135+
/**
136+
* Object to synchronize read and write completion for the channel in this context, since we will have
137+
* multiple outbound writers accessing the same channel
138+
*/
139+
public Object getChannelCompletionMutex() { return mutex; }
138140
}

0 commit comments

Comments
 (0)