Skip to content

Commit 3fb5a3c

Browse files
committed
Funcional service invocation!
1 parent 99755e2 commit 3fb5a3c

File tree

6 files changed

+100
-39
lines changed

6 files changed

+100
-39
lines changed

src/main/java/org/ros/internal/node/service/DefaultServiceClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5-
import org.ros.internal.message.MessageBufferPool;
5+
//import org.ros.internal.message.MessageBufferPool;
66
import org.ros.internal.transport.ClientHandshakeListener;
77
import org.ros.internal.transport.ConnectionHeader;
88
import org.ros.internal.transport.ConnectionHeaderFields;
@@ -70,7 +70,7 @@ public void reset() {
7070
private final ServiceDeclaration serviceDeclaration;
7171

7272
private final MessageFactory messageFactory;
73-
private final MessageBufferPool messageBufferPool;
73+
//private final MessageBufferPool messageBufferPool;
7474
private final Queue<ServiceResponseListener<S>> responseListeners;
7575
private final ConnectionHeader connectionHeader;
7676
private final TcpClientManager tcpClientManager;
@@ -89,7 +89,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
8989
MessageFactory messageFactory, ScheduledExecutorService executorService) throws IOException {
9090
this.serviceDeclaration = serviceDeclaration;
9191
this.messageFactory = messageFactory;
92-
messageBufferPool = new MessageBufferPool();
92+
//messageBufferPool = new MessageBufferPool();
9393
responseListeners = new LinkedList<ServiceResponseListener<S>>();
9494
connectionHeader = new ConnectionHeader();
9595
connectionHeader.addField(ConnectionHeaderFields.CALLER_ID, nodeName.toString());

src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
package org.ros.internal.node.service;
22

3+
import org.apache.commons.logging.LogFactory;
34
//import org.jboss.netty.buffer.ChannelBuffer;
45
//import org.jboss.netty.buffer.ChannelBuffers;
56
//import org.jboss.netty.channel.ChannelHandlerContext;
67
//import org.jboss.netty.channel.MessageEvent;
78
//import org.jboss.netty.channel.SimpleChannelHandler;
89
import org.ros.exception.RosRuntimeException;
910
import org.ros.exception.ServiceException;
10-
import org.ros.internal.message.MessageBufferPool;
11+
import org.ros.internal.message.MessageBuffers;
12+
//import org.ros.internal.message.MessageBufferPool;
1113
import org.ros.internal.system.Utility;
1214
import org.ros.internal.transport.ChannelHandler;
1315
import org.ros.internal.transport.ChannelHandlerContext;
1416
import org.ros.message.MessageFactory;
1517
import org.ros.node.service.ServiceResponseBuilder;
1618

19+
import rosgraph_msgs.Log;
20+
1721
import java.io.IOException;
1822
import java.nio.ByteBuffer;
1923
import java.nio.charset.Charset;
@@ -23,13 +27,14 @@
2327
* @author jg
2428
*/
2529
class ServiceRequestHandler<T, S> implements ChannelHandler {
26-
30+
private static final org.apache.commons.logging.Log log = LogFactory.getLog(ServiceRequestHandler.class);
2731
private final ServiceDeclaration serviceDeclaration;
2832
private final ServiceResponseBuilder<T, S> responseBuilder;
2933

3034
private final MessageFactory messageFactory;
3135
private final ExecutorService executorService;
32-
private final MessageBufferPool messageBufferPool;
36+
//private final ByteBuffer messageBuffer = MessageBuffers.dynamicBuffer();
37+
//private final MessageBufferPool messageBufferPool;
3338

3439
public ServiceRequestHandler(ServiceDeclaration serviceDeclaration,
3540
ServiceResponseBuilder<T, S> responseBuilder, MessageFactory messageFactory,
@@ -38,26 +43,35 @@ public ServiceRequestHandler(ServiceDeclaration serviceDeclaration,
3843
this.responseBuilder = responseBuilder;
3944
this.messageFactory = messageFactory;
4045
this.executorService = executorService;
41-
messageBufferPool = new MessageBufferPool();
46+
// messageBufferPool = new MessageBufferPool();
4247
}
4348

44-
private void handleRequest(T request, ByteBuffer responseBuffer) throws ServiceException {
49+
private S handleRequest(T request) throws ServiceException {
4550
S response = messageFactory.newFromType(serviceDeclaration.getType());
4651
responseBuilder.build(request, response);
47-
Utility.serialize(response, responseBuffer);
52+
return response;
4853
}
4954

50-
private void handleSuccess(final ChannelHandlerContext ctx, ServiceServerResponse response, ByteBuffer responseBuffer) {
55+
private void handleSuccess(final ChannelHandlerContext ctx, S result, ServiceServerResponse response, ByteBuffer responseBuffer) {
5156
response.setErrorCode(1);
52-
response.setMessageLength(responseBuffer.limit());
53-
response.setMessageBytes(responseBuffer.array());
54-
ByteBuffer resbuf = messageBufferPool.acquire();
55-
Utility.serialize(response, resbuf);
57+
ByteBuffer resbuf = MessageBuffers.dynamicBuffer(); // allocate for serialized result of service method
58+
Utility.serialize(result, resbuf);
59+
byte[] b = new byte[resbuf.limit()];
60+
resbuf.get(b);
61+
response.setMessageBytes(b);
62+
response.setMessageLength(response.getMessageBytes().length);
63+
responseBuffer.putInt(response.getErrorCode());
64+
responseBuffer.putInt(response.getMessageLength());
65+
log.info("Response to be serialized:"+response);
66+
Utility.serialize(response, responseBuffer);
67+
//log.info("ServiceRequestHandler serializing message buffer "+responseBuffer+
68+
// " with payload "+response.getMessageBytes().length);
5669
try {
57-
ctx.write(resbuf.array());
70+
ctx.write(responseBuffer.array());
5871
} catch (IOException e) {
5972
throw new RosRuntimeException(e);
6073
}
74+
MessageBuffers.returnBuffer(resbuf);
6175
}
6276

6377
private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, String message, ByteBuffer responseBuffer) {
@@ -66,6 +80,8 @@ private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse
6680
response.setMessageLength(encodedMessage.limit());
6781
response.setMessage(encodedMessage);
6882
response.setMessageBytes(encodedMessage.array());
83+
responseBuffer.putInt(response.getErrorCode());
84+
responseBuffer.putInt(response.getMessageLength());
6985
Utility.serialize(response, responseBuffer);
7086
try {
7187
ctx.write(responseBuffer.array());
@@ -84,19 +100,21 @@ public Object channelRead(final ChannelHandlerContext ctx, Object e) throws Exce
84100
@Override
85101
public void run() {
86102
ServiceServerResponse response = new ServiceServerResponse();
87-
ByteBuffer responseBuffer = messageBufferPool.acquire();
103+
ByteBuffer responseBuffer = MessageBuffers.dynamicBuffer();
88104
boolean success;
105+
S result = null;
89106
try {
90-
handleRequest(requestBuffer, responseBuffer);
107+
result = handleRequest(requestBuffer);
91108
success = true;
92109
} catch (ServiceException ex) {
93110
handleError(ctx, response, ex.getMessage(), responseBuffer);
94111
success = false;
95112
}
96113
if (success) {
97-
handleSuccess(ctx, response, responseBuffer);
114+
handleSuccess(ctx, result, response, responseBuffer);
98115
}
99-
messageBufferPool.release(responseBuffer);
116+
//messageBufferPool.release(responseBuffer);
117+
MessageBuffers.returnBuffer(responseBuffer);
100118
}
101119
});
102120
return requestBuffer;

src/main/java/org/ros/internal/node/service/ServiceResponseDecoder.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,36 @@ protected void decode(int code, ChannelHandlerContext ctx, ByteBuffer buffer, Li
3333
switch (code) {
3434
case ERROR_CODE:
3535
response.setErrorCode(buffer.getInt());
36+
try {
37+
if(rstate.size() > 0)
38+
rstate.add(0, response);
39+
else
40+
rstate.add(response);
41+
return;
42+
} finally {
43+
reset();
44+
}
3645
//checkpoint(ServiceResponseDecoderState.MESSAGE_LENGTH);
3746
case MESSAGE_LENGTH:
3847
response.setMessageLength(buffer.getInt());
48+
try {
49+
if(rstate.size() > 1)
50+
rstate.add(1, response);
51+
else
52+
rstate.add(response);
53+
return;
54+
} finally {
55+
reset();
56+
}
3957
// checkpoint(ServiceResponseDecoderState.MESSAGE);
4058
case MESSAGE:
4159
response.setMessage(buffer);
4260
try {
43-
//return response;
44-
rstate.add(response);
61+
if(rstate.size() > 2)
62+
rstate.add(2, response);
63+
else
64+
rstate.add(response);
65+
return;
4566
} finally {
4667
reset();
4768
}

src/main/java/org/ros/internal/node/service/ServiceResponseHandler.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
//import org.jboss.netty.channel.SimpleChannelHandler;
99
import org.ros.exception.RemoteException;
1010
import org.ros.internal.message.MessageBuffers;
11+
import org.ros.internal.message.service.ServiceDefinitionResourceProvider;
12+
import org.ros.internal.message.service.ServiceRequestMessageFactory;
13+
import org.ros.internal.message.service.ServiceResponseMessageFactory;
14+
import org.ros.internal.node.response.Response;
1115
import org.ros.internal.node.response.StatusCode;
1216
import org.ros.internal.system.Utility;
1317
import org.ros.internal.transport.ChannelHandler;
@@ -23,6 +27,13 @@
2327

2428
/**
2529
* A handler for service responses.<p/>
30+
* This handler lives on the pipe line and can be retrieved as follows:
31+
* ChannelHandler ch = ctx.pipeline().get("ResponseHandler");
32+
* {@code ServiceResponseHandler<ResponseType> srh = (ServiceResponseHandler<ResponseType>)ch;}
33+
* It arrives there by a successful handshake via the {@code ServiceClientHandshakeHandler.onSuccess()}
34+
* which is called with the ConnectionHeader and ChannelHandlerContext as parameters.<p/>
35+
* The ctx.pipeLine() method retrieves the pipe line and and addList is invoked using the literal "ResponseHandler"
36+
* as the key and a new instance of ServiceResponseHandler<S> is created to use as value.<p/>
2637
* The handler revolves around the encoder and the decoder much like the pub/sub model.
2738
* The encoder and decoder work with the channel handler context and the event model to deliver
2839
* requests and responses on and off the bus.
@@ -96,35 +107,33 @@ public ServiceResponseHandler(Queue<ServiceResponseListener<ResponseType>> messa
96107
*/
97108
@Override
98109
public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception {
99-
log.info("ServiceResponseHandler channelRead for ChannelHandlerContext:"+ctx+" using Object:"+e);
100110
final ServiceResponseListener<ResponseType> listener = responseListeners.poll();
101111
assert(listener != null) : "No listener for incoming service response.";
102112
final ByteBuffer buffer = ByteBuffer.wrap((byte[]) e);
113+
ServiceServerResponse response = (ServiceServerResponse) Utility.deserialize(buffer);
114+
log.info("ServiceResponseHandler channelRead for ChannelHandlerContext:"+ctx+" with ServiceServerResponse:"+response);
103115
final ServiceResponseDecoder decoder = new ServiceResponseDecoder();
104-
final List<Object> rstate = new ArrayList<Object>();
105-
ServiceServerResponse response = new ServiceServerResponse();
116+
//final List<Object> rstate = new ArrayList<Object>();
106117
executorService.execute(new Runnable() {
107118
@Override
108119
public void run() {
109120
try {
110-
decoder.decode(ServiceResponseDecoderState.ERROR_CODE.ordinal(), ctx, buffer, rstate);
111-
decoder.decode(ServiceResponseDecoderState.MESSAGE_LENGTH.ordinal(), ctx, buffer, rstate);
112-
decoder.decode(ServiceResponseDecoderState.MESSAGE.ordinal(), ctx, buffer, rstate);
113-
ServiceServerResponse sresponse = (ServiceServerResponse) rstate.get(0);
114-
if (sresponse.getErrorCode() != ServiceResponseDecoderState.ERROR_CODE.ordinal()) {
115-
sresponse = (ServiceServerResponse) rstate.get(2);
121+
//decoder.decode(ServiceResponseDecoderState.ERROR_CODE.ordinal(), ctx, buffer, rstate);
122+
//decoder.decode(ServiceResponseDecoderState.MESSAGE_LENGTH.ordinal(), ctx, buffer, rstate);
123+
//decoder.decode(ServiceResponseDecoderState.MESSAGE.ordinal(), ctx, buffer, rstate);
124+
//ServiceServerResponse sresponse = (ServiceServerResponse) rstate.get(0);
125+
if (response.getErrorCode() != ServiceResponseDecoderState.ERROR_CODE.ordinal()) {
116126
// TODO UDP transport?
117127
//sresponse = (ServiceServerResponse) rstate.get(1);
118128
//int messageLength = sresponse.getMessageLength();
119-
listener.onSuccess((ResponseType) sresponse);
129+
//Response.fromListChecked(sresponse, resultFactory)
130+
Object o = Utility.deserialize(ByteBuffer.wrap(response.getMessageBytes()));
131+
log.info("About to fire successful call with response from service. Class:"+o.getClass()+" payload:"+o);
132+
listener.onSuccess( (ResponseType) o);
120133
} else {
121-
sresponse = (ServiceServerResponse) rstate.get(2);
122-
String message = Charset.forName("US-ASCII").decode(sresponse.getMessage()).toString();
134+
String message = Charset.forName("US-ASCII").decode(response.getMessage()).toString();
123135
listener.onFailure(new RemoteException(StatusCode.ERROR, message));
124136
}
125-
response.setErrorCode(sresponse.getErrorCode());
126-
response.setMessageLength(sresponse.getMessageLength());
127-
response.setMessage(sresponse.getMessage());
128137
} catch (Exception e1) {
129138
log.error("Error:"+e1+" decoding ServiceResponse for context:"+ctx+" using proposed ServiceResponse:"+e);
130139
e1.printStackTrace();
@@ -133,7 +142,8 @@ public void run() {
133142
});
134143
return response;
135144
}
136-
145+
146+
137147
@Override
138148
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
139149
// TODO Auto-generated method stub

src/main/java/org/ros/internal/node/service/ServiceServerResponse.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,17 @@ public int getMessageLength() {
4848
public byte[] getMessageBytes() { return messageBytes; }
4949

5050
public void setMessageBytes(byte[] b) { messageBytes = b; }
51+
52+
public String toString() {
53+
StringBuilder sb = new StringBuilder();
54+
sb.append(this.getClass().getName());
55+
sb.append(" Error Code:");
56+
sb.append(errorCode);
57+
sb.append(" Message Length:");
58+
sb.append(messageLength);
59+
sb.append(" Message size:");
60+
sb.append(messageBytes.length);
61+
return sb.toString();
62+
}
5163

5264
}

src/main/java/org/ros/node/NodeConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* @author ethan.rublee@gmail.com (Ethan Rublee)
5252
* @author kwc@willowgarage.com (Ken Conley)
5353
* @author damonkohler@google.com (Damon Kohler)
54-
* @author jg
54+
* @author groffj@neocoretechs.com (Jonathan Neville Groff)
5555
*/
5656
public class NodeConfiguration {
5757

@@ -61,7 +61,7 @@ public class NodeConfiguration {
6161
public static final InetSocketAddress DEFAULT_MASTER_URI;
6262

6363
static {
64-
DEFAULT_MASTER_URI = new InetSocketAddress("172.16.0.101", 8090);
64+
DEFAULT_MASTER_URI = new InetSocketAddress("127.0.0.1", 8090);
6565
}
6666

6767
private NameResolver parentResolver;

0 commit comments

Comments
 (0)