Skip to content

Commit b97ec94

Browse files
committed
Netty removal complete. Alpha functionality
1 parent 4ba23b1 commit b97ec94

23 files changed

+512
-162
lines changed

src/main/java/org/ros/internal/node/server/BaseServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void startServer() throws IOException {
4141
startServer(WORKBOOTPORT, address);
4242
}
4343
/**
44-
* Load the methods of main Relatrix class as remotely invokable then we instantiate RelatrixServer
44+
* Start the server
4545
* @param args
4646
* @throws Exception
4747
*/

src/main/java/org/ros/internal/node/server/ThreadPoolManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import org.apache.commons.logging.Log;
1414
import org.apache.commons.logging.LogFactory;
15-
import org.ros.internal.transport.tcp.TcpRosServer;
1615

1716

1817
/**

src/main/java/org/ros/internal/node/service/ServiceClientHandshakeHandler.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public ServiceClientHandshakeHandler(ConnectionHeader outgoingConnectionHeader,
4545
@Override
4646
protected void onSuccess(ConnectionHeader incommingConnectionHeader, ChannelHandlerContext ctx) {
4747
ChannelPipeline pipeline = ctx.pipeline();
48-
pipeline.remove(TcpClientPipelineFactory.LENGTH_FIELD_BASED_FRAME_DECODER);
49-
pipeline.remove(ServiceClientHandshakeHandler.this);
48+
//pipeline.remove(TcpClientPipelineFactory.LENGTH_FIELD_BASED_FRAME_DECODER);
49+
pipeline.remove(ServiceClientHandshakeHandler.this); // remove this instance of ChannelHandler
5050
//pipeline.addLast("ResponseDecoder", new ServiceResponseDecoder<S>());
5151
pipeline.addLast("ResponseHandler", new ServiceResponseHandler<S>(responseListeners, executorService));
5252
}
@@ -65,33 +65,37 @@ public String getName() {
6565

6666
@Override
6767
public void handlerAdded(ChannelHandlerContext arg0) throws Exception {
68-
// TODO Auto-generated method stub
68+
if( DEBUG )
69+
log.debug("Handler added "+arg0);
6970

7071
}
7172

7273
@Override
7374
public void handlerRemoved(ChannelHandlerContext arg0) throws Exception {
74-
// TODO Auto-generated method stub
75-
75+
if( DEBUG )
76+
log.debug("Handler removed "+arg0);
7677
}
7778

7879
@Override
7980
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
80-
// TODO Auto-generated method stub
81+
if( DEBUG )
82+
log.debug("Channel Inactive "+ctx);
8183

8284
}
8385

8486
@Override
8587
public void exceptionCaught(ChannelHandlerContext ctx, Throwable th)
8688
throws Exception {
87-
// TODO Auto-generated method stub
89+
if( DEBUG )
90+
log.debug("Exception caught "+ctx+" "+th);
8891

8992
}
9093

9194
@Override
9295
public void userEventTriggered(ChannelHandlerContext ctx, Object event)
9396
throws Exception {
94-
// TODO Auto-generated method stub
97+
if( DEBUG )
98+
log.debug("Event triggly "+ctx+ " "+event);
9599

96100
}
97101

src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public SubscriberHandshakeHandler(ConnectionHeader outgoingConnectionHeader,
3939
super(new SubscriberHandshake(outgoingConnectionHeader), executorService);
4040
this.incomingMessageQueue = incomingMessageQueue;
4141
if( DEBUG )
42-
log.debug("subscriberhandshakeHandler ctor:"+this);
42+
log.info("subscriberhandshakeHandler ctor:"+this);
4343
}
4444

4545
@Override
@@ -69,27 +69,27 @@ public String getName() {
6969
public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1) throws Exception {
7070
onFailure(arg1.getMessage(), arg0);
7171
if( DEBUG )
72-
log.debug("SubscriberHandshakeHandler.exception caught:"+arg0+" "+arg1);
72+
log.info("SubscriberHandshakeHandler.exception caught:"+arg0+" "+arg1);
7373
}
7474

7575
@Override
7676
public void handlerAdded(ChannelHandlerContext arg0) throws Exception {
7777
if( DEBUG )
78-
log.debug("SubscriberHandshakeHandler.handlerAdded:"+arg0);
78+
log.info("SubscriberHandshakeHandler.handlerAdded:"+arg0);
7979
}
8080

8181
@Override
8282
public void handlerRemoved(ChannelHandlerContext arg0) throws Exception {
8383
if( DEBUG )
84-
log.debug("SubscriberHandshakeHandler.handlerRemoved:"+arg0);
84+
log.info("SubscriberHandshakeHandler.handlerRemoved:"+arg0);
8585

8686
}
8787

8888
@Override
8989
public void userEventTriggered(ChannelHandlerContext ctx, Object event)
9090
throws Exception {
9191
if( DEBUG )
92-
log.debug("SubscriberHandshakeHandler.userEventTriggered:"+ctx+" "+event);
92+
log.info("SubscriberHandshakeHandler.userEventTriggered:"+ctx+" "+event);
9393

9494
}
9595

src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
*
2121
*/
2222
public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {
23-
private static boolean DEBUG = true;
23+
protected static boolean DEBUG = true;
2424
private static final Log log = LogFactory.getLog(BaseClientHandshakeHandler.class);
2525
private final ClientHandshake clientHandshake;
2626
private final ListenerGroup<ClientHandshakeListener> clientHandshakeListeners;
@@ -41,7 +41,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
4141

4242
@Override
4343
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
44-
log.debug("Channel inactive"+ctx);
44+
log.info("Channel inactive"+ctx);
4545
}
4646

4747
@Override
@@ -61,7 +61,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object buff) throws Excepti
6161
@Override
6262
public void channelReadComplete(ChannelHandlerContext arg0) throws Exception {
6363
if( DEBUG )
64-
log.debug("SubscriberHandshakeHandler.channelReadComplete:"+arg0);
64+
log.info("SubscriberHandshakeHandler.channelReadComplete:"+arg0);
6565

6666
}
6767
/**

src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@
1212

1313
/**
1414
* A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers.
15-
* There is one channel per context;
16-
* There is one pipeline per group of contexts
17-
* There is one executor per group of contexts
15+
* There is one channel per context.
16+
* There is one pipeline context.
17+
* There is one executor per group of contexts.
18+
* The pipeline is a stateful collection of handlers that represent the current channel state and means
19+
* of executing functions in the process of connecting, disconnecting, reading, failing etc.
20+
* The pipeline is configured by means of factories that create ChannelInitializers, inserting
21+
* them in order in the pipeline deque.
22+
* The functions of the system move data through the pipeline, triggering the handlers in the sequence they were
23+
* added.
1824
* @author jg
1925
*
2026
*/
@@ -23,15 +29,16 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
2329
Executor executor;
2430
AsynchronousSocketChannel channel;
2531
ChannelPipeline pipeline;
32+
2633
ByteBuffer buf;
2734
int MAXBUF = 2000000;
2835

29-
public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, ChannelPipeline pipe, AsynchronousSocketChannel ch, Executor exc) {
36+
public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
3037
channelGroup = grp;
31-
pipeline = pipe;
3238
channel = ch;
3339
executor = exc;
3440
buf = ByteBuffer.allocate(MAXBUF);
41+
pipeline = new ChannelPipelineImpl(this);
3542
}
3643

3744
public void setChannel(AsynchronousSocketChannel sock) {
@@ -125,6 +132,9 @@ public Channel channel() {
125132
return channel;
126133
}
127134

128-
135+
@Override
136+
public String toString() {
137+
return new String("ChannelHandlerContext:"+channel+" "+channelGroup+" "+executor+" "+pipeline);
138+
}
129139

130140
}

src/main/java/org/ros/internal/transport/ChannelPipelineImpl.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.ros.internal.transport;
22

3-
import java.nio.channels.Channel;
43
import java.util.AbstractMap;
54
import java.util.ArrayList;
65
import java.util.Iterator;
@@ -10,28 +9,25 @@
109
import java.util.concurrent.ConcurrentHashMap;
1110
import java.util.concurrent.LinkedBlockingDeque;
1211

13-
import org.ros.internal.transport.tcp.ChannelInitializer;
1412
/**
1513
* Implementation of the ChannelPipeline interface to process the requests via the pluggable ChannelHandlers
16-
* named in the queue
14+
* named in the queue. The pipeline is per ChannelHandlerContext, per channel as it represents state in the form of the
15+
* presence or absence of handlers such as the handshake handler, which disappears after initial handshake.
1716
* @author jg
1817
*
1918
*/
2019
public class ChannelPipelineImpl implements ChannelPipeline {
20+
2121
LinkedBlockingDeque<Entry<String, ChannelHandler>> queue = new LinkedBlockingDeque<Entry<String, ChannelHandler>>();
2222
private ChannelHandlerContext ctx;
2323

2424
public ChannelPipelineImpl(ChannelHandlerContext ctx) {
2525
this.ctx = ctx;
2626
}
27-
public ChannelPipelineImpl() {}
27+
//public ChannelPipelineImpl() {}
2828

2929
public void setContext(ChannelHandlerContext ctx) { this.ctx = ctx; }
3030

31-
public void inject(ChannelInitializer factory) throws Exception {
32-
factory.channelRegistered(ctx);
33-
}
34-
3531

3632
@Override
3733
public Iterator<Entry<String, ChannelHandler>> iterator() {
@@ -244,5 +240,9 @@ public ChannelPipeline fireChannelReadComplete() throws Exception {
244240
return this;
245241
}
246242

243+
@Override
244+
public String toString() {
245+
return new String("[Channel pipeline:"+ctx.channel()+" with "+queue.size()+" handlers]");
246+
}
247247

248248
}

src/main/java/org/ros/internal/transport/queue/MessageReceiver.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,39 +61,39 @@ public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1)
6161
@Override
6262
public void handlerAdded(ChannelHandlerContext arg0) throws Exception {
6363
if( DEBUG ) {
64-
log.debug("MessageReceiver handler added:"+arg0);
64+
log.info("MessageReceiver handler added:"+arg0);
6565
}
6666

6767
}
6868

6969
@Override
7070
public void handlerRemoved(ChannelHandlerContext arg0) throws Exception {
7171
if( DEBUG ) {
72-
log.debug("MessageReceiver handler removed:"+arg0);
72+
log.info("MessageReceiver handler removed:"+arg0);
7373
}
7474

7575
}
7676

7777
@Override
7878
public void channelActive(ChannelHandlerContext arg0) throws Exception {
7979
if( DEBUG ) {
80-
log.debug("MessageReceiver channel active:"+arg0);
80+
log.info("MessageReceiver channel active:"+arg0);
8181
}
8282

8383
}
8484

8585
@Override
8686
public void channelInactive(ChannelHandlerContext arg0) throws Exception {
8787
if( DEBUG ) {
88-
log.debug("MessageReceiver channel inactive:"+arg0);
88+
log.info("MessageReceiver channel inactive:"+arg0);
8989
}
9090

9191
}
9292

9393
@Override
9494
public void channelReadComplete(ChannelHandlerContext arg0) throws Exception {
9595
if( DEBUG ) {
96-
log.debug("MessageReceiver read complete:"+arg0);
96+
log.info("MessageReceiver read complete:"+arg0);
9797
}
9898

9999
}

src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void loop() throws InterruptedException {
5757
final ByteBuffer buffer = messageBufferPool.acquire();
5858
Utility.serialize(message, buffer);
5959
if (DEBUG ) {
60-
log.info(String.format("Writing %d bytes to %d channels.", buffer.limit(), channelGroup));
60+
log.info(String.format("Writing %d bytes.", buffer.limit()));
6161
}
6262
// we have to wait until the write
6363
// operation is complete before returning the buffer to the pool.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package org.ros.internal.transport.tcp;
2+
3+
import java.io.IOException;
4+
import java.net.InetAddress;
5+
import java.net.InetSocketAddress;
6+
import java.net.Socket;
7+
import java.nio.channels.AsynchronousSocketChannel;
8+
import java.util.concurrent.Future;
9+
10+
import org.apache.commons.logging.Log;
11+
import org.apache.commons.logging.LogFactory;
12+
import org.ros.internal.transport.ChannelHandlerContext;
13+
import org.ros.internal.transport.ChannelHandlerContextImpl;
14+
15+
/**
16+
* Functionally this class Extends AsynchTCPServer, takes connections and spins the worker thread to handle each one
17+
* @author jg Copyright (C) NeoCoreTechs 2016
18+
*
19+
*/
20+
public final class AsynchBaseServer extends AsynchTCPServer {
21+
private static boolean DEBUG = true;
22+
private static final Log log = LogFactory.getLog(AsynchBaseServer.class);
23+
public int WORKBOOTPORT = 0;
24+
public InetSocketAddress address = null;
25+
private TcpRosServer tcpserver = null;
26+
27+
28+
public AsynchBaseServer(TcpRosServer server) throws IOException {
29+
super();
30+
this.address = server.getAddress();
31+
this.tcpserver = server;
32+
}
33+
/**
34+
* Construct the Server, fill in port and address later.
35+
* @throws IOException
36+
* @throws ClassNotFoundException
37+
*/
38+
public AsynchBaseServer() throws IOException, ClassNotFoundException {
39+
super();
40+
}
41+
42+
public void startServer() throws IOException {
43+
if( address == null )
44+
throw new IOException("Server address not defined, can not start Base Server");
45+
startServer(channelGroup, exc, address);
46+
}
47+
/**
48+
* Start the server
49+
* @param args
50+
* @throws Exception
51+
*/
52+
public static void main(String args[]) throws Exception {
53+
AsynchBaseServer bs = new AsynchBaseServer();
54+
log.info("ROSLite Asynch transport Server started on "+InetAddress.getLocalHost().getHostName()+" port "+bs.WORKBOOTPORT);
55+
}
56+
57+
public void run() {
58+
while(!shouldStop) {
59+
try {
60+
Future<AsynchronousSocketChannel> channel = server.accept();
61+
if( DEBUG ) {
62+
log.info("Accept "+channel);
63+
}
64+
ChannelHandlerContext ctx = new ChannelHandlerContextImpl(channelGroup, channel.get(), exc);
65+
tcpserver.getSubscribers().add(ctx);
66+
ctx.pipeline().fireChannelActive();
67+
// inject the handlers, start handshake
68+
tcpserver.getFactoryStack().inject(ctx);
69+
ctx.pipeline().fireChannelRegistered();
70+
AsynchTCPWorker uworker = new AsynchTCPWorker(channel.get(), tcpserver);
71+
exc.execute(uworker);
72+
if( DEBUG ) {
73+
log.info("ROS Asynch transport server worker starting");
74+
}
75+
} catch(Exception e) {
76+
log.error("Asynch Server socket accept exception "+e,e);
77+
}
78+
}
79+
80+
}
81+
82+
public Integer getPort() {
83+
return WORKBOOTPORT;
84+
}
85+
86+
public String toString() {
87+
return "AsynchBaseServer for "+ tcpserver;
88+
}
89+
90+
}

0 commit comments

Comments
 (0)