Skip to content

Commit c3092bc

Browse files
committed
Fixed subscriber shutdown.
1 parent b936d6f commit c3092bc

File tree

4 files changed

+31
-7
lines changed

4 files changed

+31
-7
lines changed

src/main/java/org/ros/internal/node/client/Registrar.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* which creates a SlaveClient of type SlaveRpcEndpointImpl to
3838
* contact the publishers. If successful, call signalOnMasterRegistrationSuccess for the subscriber.
3939
*
40-
* @author jg
40+
* @author Jonathan Groff Copyright (C) NeoCoreTechs 2015,2021
4141
*/
4242
public class Registrar implements TopicParticipantManagerListener, ServiceManagerListener {
4343

@@ -252,7 +252,15 @@ public void run() {
252252
});
253253
}
254254
}
255-
255+
256+
@Override
257+
public void onSubscriberRemoved(final DefaultSubscriber<?> subscriber, boolean remove) {
258+
if (DEBUG) {
259+
log.info("Unregistering subscriber for shutdown: " + subscriber);
260+
}
261+
masterClient.unregisterSubscriber(nodeIdentifier, subscriber);
262+
}
263+
256264
@Override
257265
public void onServiceServerAdded(final DefaultServiceServer<?, ?> serviceServer) {
258266
if (DEBUG) {

src/main/java/org/ros/internal/node/topic/SubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void onNewPublisher(Subscriber<T> subscriber, PublisherIdentifier publish
5858
@Override
5959
public void onShutdown(Subscriber<T> subscriber) {
6060
assert(subscriber != null );
61-
topicParticipantManager.removeSubscriber((DefaultSubscriber<T>) subscriber);
61+
topicParticipantManager.removeSubscriber((DefaultSubscriber<T>) subscriber, true);
6262
}
6363
});
6464
topicParticipantManager.addSubscriber(subscriber);

src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import java.util.ArrayList;
1010
import java.util.Collection;
11-
import java.util.HashMap;
1211
import java.util.List;
1312
import java.util.Map;
1413
import java.util.concurrent.ConcurrentHashMap;
@@ -104,7 +103,16 @@ public void removeSubscriber(DefaultSubscriber<?> subscriber) {
104103
listener.onSubscriberRemoved(subscriber);
105104
}
106105
}
107-
106+
/**
107+
* Variation for removal of subscriber during executor service shutdown.<p/>
108+
* Dont fire onSubscriberRemoved method of Registrar, which resubmits
109+
* @param subscriber
110+
* @param b
111+
*/
112+
public void removeSubscriber(DefaultSubscriber<?> subscriber, boolean b) {
113+
subscribers.remove(subscriber.getTopicName());
114+
}
115+
108116
public void addSubscriberConnection(DefaultSubscriber<?> subscriber, PublisherIdentifier publisherIdentifier) {
109117
if(DEBUG)
110118
log.info("Connecting subscriber:"+subscriber+" to publisher Identifier:"+publisherIdentifier+" for "+this);
@@ -163,4 +171,6 @@ public Collection<DefaultPublisher<?>> getPublishers() {
163171
public Collection<SubscriberIdentifier> getPublisherConnections(DefaultPublisher<?> publisher) {
164172
return publisherConnections.get(publisher);
165173
}
174+
175+
166176
}

src/main/java/org/ros/internal/node/topic/TopicParticipantManagerListener.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,14 @@ public interface TopicParticipantManagerListener {
3737
/**
3838
* Called when a {@link Subscriber} is removed.
3939
*
40-
* @param subscriber
41-
* the {@link Subscriber} that was removed
40+
* @param subscriber the {@link Subscriber} that was removed
4241
*/
4342
void onSubscriberRemoved(DefaultSubscriber<?> subscriber);
43+
44+
/**
45+
* Called when a {@link Subscriber} is removed for shutdown.
46+
*
47+
* @param subscriber the {@link Subscriber} that was removed
48+
*/
49+
void onSubscriberRemoved(DefaultSubscriber<?> subscriber, boolean remove);
4450
}

0 commit comments

Comments
 (0)