Skip to content

Commit 84c7288

Browse files
committed
Fix adding local endpoints in discovery: match existing endpoints to them (if possible)
1 parent 7ae5548 commit 84c7288

File tree

2 files changed

+172
-128
lines changed

2 files changed

+172
-128
lines changed

src/discovery/discovery.rs

Lines changed: 142 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ mod with_key {
135135
use mio_extras::timer::Timer;
136136

137137
use super::{DataReaderPlCdr, DataWriterPlCdr};
138-
use crate::{
139-
polling::TimerPolicy, serialization::pl_cdr_adapters::*, Key, Keyed, Topic, TopicKind,
140-
};
138+
use crate::{polling::TimerPolicy, serialization::pl_cdr_adapters::*, Key, Keyed, Topic, TopicKind};
141139

142140
pub const TOPIC_KIND: TopicKind = TopicKind::WithKey;
143141

@@ -751,10 +749,10 @@ impl Discovery {
751749
return; // terminate event loop
752750
}
753751
DiscoveryCommand::AddLocalWriter { guid } => {
754-
self.sedp_publish_single_writer(guid);
752+
self.add_local_writer(guid);
755753
}
756754
DiscoveryCommand::AddLocalReader { guid } => {
757-
self.sedp_publish_single_reader(guid);
755+
self.add_local_reader(guid);
758756
}
759757
DiscoveryCommand::AddTopic { topic_name } => {
760758
self.sedp_publish_topic(&topic_name);
@@ -1725,154 +1723,170 @@ impl Discovery {
17251723
discovery_db_write(&self.discovery_db).topic_cleanup();
17261724
}
17271725

1728-
pub fn sedp_publish_single_reader(&self, guid: GUID) {
1729-
let db = discovery_db_read(&self.discovery_db);
1730-
if let Some(reader_data) = db.get_local_topic_reader(guid) {
1731-
if !reader_data
1732-
.reader_proxy
1733-
.remote_reader_guid
1734-
.entity_id
1735-
.kind()
1736-
.is_user_defined()
1737-
{
1738-
// Only readers of user-defined topics are published to discovery
1739-
return;
1740-
}
1726+
fn sedp_publish_single_reader(&self, reader_data: &DiscoveredReaderData) {
1727+
if !reader_data
1728+
.reader_proxy
1729+
.remote_reader_guid
1730+
.entity_id
1731+
.kind()
1732+
.is_user_defined()
1733+
{
1734+
// Only readers of user-defined topics are published to discovery
1735+
return;
1736+
}
17411737

1742-
#[cfg(not(feature = "security"))]
1743-
let do_nonsecure_write = true;
1738+
#[cfg(not(feature = "security"))]
1739+
let do_nonsecure_write = true;
17441740

1745-
#[cfg(feature = "security")]
1746-
let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() {
1747-
security.sedp_publish_single_reader(
1748-
&self.dcps_subscription.writer,
1749-
&self.dcps_subscriptions_secure.writer,
1750-
reader_data,
1751-
);
1752-
false
1753-
} else {
1754-
true // No security configured
1755-
};
1741+
#[cfg(feature = "security")]
1742+
let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() {
1743+
security.sedp_publish_single_reader(
1744+
&self.dcps_subscription.writer,
1745+
&self.dcps_subscriptions_secure.writer,
1746+
reader_data,
1747+
);
1748+
false
1749+
} else {
1750+
true // No security configured
1751+
};
17561752

1757-
if do_nonsecure_write {
1758-
match self
1759-
.dcps_subscription
1760-
.writer
1761-
.write(reader_data.clone(), None)
1762-
{
1763-
Ok(()) => {
1764-
debug!(
1765-
"Published DCPSSubscription data on topic {}, reader guid {:?}, data {:?}",
1766-
reader_data.subscription_topic_data.topic_name(),
1767-
guid,
1768-
reader_data,
1769-
);
1770-
}
1771-
Err(e) => {
1772-
error!(
1773-
"Failed to publish DCPSSubscription data on topic {}, reader guid {:?}. Error: {e}",
1774-
reader_data.subscription_topic_data.topic_name(),
1775-
guid
1776-
);
1777-
// TODO: try again later?
1778-
}
1753+
if do_nonsecure_write {
1754+
match self
1755+
.dcps_subscription
1756+
.writer
1757+
.write(reader_data.clone(), None)
1758+
{
1759+
Ok(()) => {
1760+
debug!(
1761+
"Published DCPSSubscription data on topic {}, reader guid {:?}, data {:?}",
1762+
reader_data.subscription_topic_data.topic_name(),
1763+
reader_data.reader_proxy.remote_reader_guid,
1764+
reader_data,
1765+
);
1766+
}
1767+
Err(e) => {
1768+
error!(
1769+
"Failed to publish DCPSSubscription data on topic {}, reader guid {:?}. Error: {e}",
1770+
reader_data.subscription_topic_data.topic_name(),
1771+
reader_data.reader_proxy.remote_reader_guid
1772+
);
1773+
// TODO: try again later?
17791774
}
17801775
}
1781-
} else {
1782-
warn!("Did not find a local reader {guid:?}");
17831776
}
17841777
}
17851778

1786-
pub fn sedp_publish_readers(&self) {
1779+
fn sedp_publish_readers(&self) {
17871780
let db = discovery_db_read(&self.discovery_db);
1788-
let local_user_reader_guids = db
1789-
.get_all_local_topic_readers()
1790-
.filter(|p| {
1791-
p.reader_proxy
1792-
.remote_reader_guid
1793-
.entity_id
1794-
.kind()
1795-
.is_user_defined()
1796-
})
1797-
.map(|drd| drd.reader_proxy.remote_reader_guid);
1781+
for reader in db.get_all_local_topic_readers() {
1782+
self.sedp_publish_single_reader(reader);
1783+
}
1784+
}
1785+
1786+
fn add_local_writer(&self, guid: GUID) {
1787+
// Get writer data from db
1788+
let db = discovery_db_read(&self.discovery_db);
1789+
let writer_data = match db.get_local_topic_writer(guid) {
1790+
Some(d) => d,
1791+
None => {
1792+
warn!("Did not find a local writer {guid:?}");
1793+
return;
1794+
}
1795+
};
17981796

1799-
for guid in local_user_reader_guids {
1800-
self.sedp_publish_single_reader(guid);
1797+
// Publish it to SEDP (if needed)
1798+
self.sedp_publish_single_writer(writer_data);
1799+
1800+
// Send the ReaderUpdated notification on any existing readers on the writer's topic
1801+
// This will result in matching the endpoints if possible
1802+
let existing_readers = db.readers_on_topic(&writer_data.publication_topic_data.topic_name());
1803+
for reader in existing_readers {
1804+
self.send_discovery_notification(DiscoveryNotificationType::ReaderUpdated {
1805+
discovered_reader_data: reader.clone(),
1806+
});
18011807
}
18021808
}
18031809

1804-
pub fn sedp_publish_single_writer(&self, guid: GUID) {
1810+
fn add_local_reader(&self, guid: GUID) {
1811+
// Get reader data from db
18051812
let db = discovery_db_read(&self.discovery_db);
1806-
if let Some(writer_data) = db.get_local_topic_writer(guid) {
1807-
if !writer_data
1808-
.writer_proxy
1809-
.remote_writer_guid
1810-
.entity_id
1811-
.kind()
1812-
.is_user_defined()
1813-
{
1814-
// Only writers of user-defined topics are published to discovery
1813+
let reader_data = match db.get_local_topic_reader(guid) {
1814+
Some(d) => d,
1815+
None => {
1816+
warn!("Did not find a local reader {guid:?}");
18151817
return;
18161818
}
1819+
};
18171820

1818-
#[cfg(not(feature = "security"))]
1819-
let do_nonsecure_write = true;
1821+
// Publish it to SEDP (if needed)
1822+
self.sedp_publish_single_reader(reader_data);
18201823

1821-
#[cfg(feature = "security")]
1822-
let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() {
1823-
security.sedp_publish_single_writer(
1824-
&self.dcps_publication.writer,
1825-
&self.dcps_publications_secure.writer,
1826-
writer_data,
1827-
);
1828-
false
1829-
} else {
1830-
true // No security configured
1831-
};
1824+
// Send the WriterUpdated notification on any existing writers on the readers's topic
1825+
// This will result in matching the endpoints if possible
1826+
let existing_writers = db.writers_on_topic(&reader_data.subscription_topic_data.topic_name());
1827+
for writer in existing_writers {
1828+
self.send_discovery_notification(DiscoveryNotificationType::WriterUpdated {
1829+
discovered_writer_data: writer.clone(),
1830+
});
1831+
}
1832+
}
18321833

1833-
if do_nonsecure_write {
1834-
match self
1835-
.dcps_publication
1836-
.writer
1837-
.write(writer_data.clone(), None)
1838-
{
1839-
Ok(()) => {
1840-
debug!(
1841-
"Published DCPSPublication data on topic {}, writer guid {:?}",
1842-
writer_data.publication_topic_data.topic_name(),
1843-
guid
1844-
);
1845-
}
1846-
Err(e) => {
1847-
error!(
1848-
"Failed to publish DCPSPublication data on topic {}, writer guid {:?}. Error: {e}",
1849-
writer_data.publication_topic_data.topic_name(),
1850-
guid
1851-
);
1852-
// TODO: try again later?
1853-
}
1834+
fn sedp_publish_single_writer(&self, writer_data: &DiscoveredWriterData) {
1835+
if !writer_data
1836+
.writer_proxy
1837+
.remote_writer_guid
1838+
.entity_id
1839+
.kind()
1840+
.is_user_defined()
1841+
{
1842+
// Only writers of user-defined topics are published to discovery
1843+
return;
1844+
}
1845+
1846+
#[cfg(not(feature = "security"))]
1847+
let do_nonsecure_write = true;
1848+
1849+
#[cfg(feature = "security")]
1850+
let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() {
1851+
security.sedp_publish_single_writer(
1852+
&self.dcps_publication.writer,
1853+
&self.dcps_publications_secure.writer,
1854+
writer_data,
1855+
);
1856+
false
1857+
} else {
1858+
true // No security configured
1859+
};
1860+
1861+
if do_nonsecure_write {
1862+
match self
1863+
.dcps_publication
1864+
.writer
1865+
.write(writer_data.clone(), None)
1866+
{
1867+
Ok(()) => {
1868+
debug!(
1869+
"Published DCPSPublication data on topic {}, writer guid {:?}",
1870+
writer_data.publication_topic_data.topic_name(),
1871+
writer_data.writer_proxy.remote_writer_guid
1872+
);
1873+
}
1874+
Err(e) => {
1875+
error!(
1876+
"Failed to publish DCPSPublication data on topic {}, writer guid {:?}. Error: {e}",
1877+
writer_data.publication_topic_data.topic_name(),
1878+
writer_data.writer_proxy.remote_writer_guid
1879+
);
1880+
// TODO: try again later?
18541881
}
18551882
}
1856-
} else {
1857-
warn!("Did not find a local writer {guid:?}");
18581883
}
18591884
}
18601885

1861-
pub fn sedp_publish_writers(&self) {
1886+
fn sedp_publish_writers(&self) {
18621887
let db: std::sync::RwLockReadGuard<'_, DiscoveryDB> = discovery_db_read(&self.discovery_db);
1863-
let local_user_writer_guids = db
1864-
.get_all_local_topic_writers()
1865-
.filter(|p| {
1866-
p.writer_proxy
1867-
.remote_writer_guid
1868-
.entity_id
1869-
.kind()
1870-
.is_user_defined()
1871-
})
1872-
.map(|drd| drd.writer_proxy.remote_writer_guid);
1873-
1874-
for guid in local_user_writer_guids {
1875-
self.sedp_publish_single_writer(guid);
1888+
for writer in db.get_all_local_topic_writers() {
1889+
self.sedp_publish_single_writer(writer);
18761890
}
18771891
}
18781892

src/discovery/discovery_db.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,36 @@ impl DiscoveryDB {
733733
.collect()
734734
}
735735

736+
pub fn writers_on_topic(&self, topic_name: &str) -> Vec<&DiscoveredWriterData> {
737+
// Get external & local writers on the topic
738+
let exernal_writers = self
739+
.external_topic_writers
740+
.values()
741+
.filter(|wd| wd.publication_topic_data.topic_name() == topic_name);
742+
let local_writers = self
743+
.local_topic_writers
744+
.values()
745+
.filter(|wd| wd.publication_topic_data.topic_name() == topic_name);
746+
// Combine & return
747+
let combined: Vec<&DiscoveredWriterData> = exernal_writers.chain(local_writers).collect();
748+
combined
749+
}
750+
751+
pub fn readers_on_topic(&self, topic_name: &str) -> Vec<&DiscoveredReaderData> {
752+
// Get external & local writers on the topic
753+
let exernal_readers = self
754+
.external_topic_readers
755+
.values()
756+
.filter(|rd| rd.subscription_topic_data.topic_name() == topic_name);
757+
let local_readers = self
758+
.local_topic_readers
759+
.values()
760+
.filter(|rd| rd.subscription_topic_data.topic_name() == topic_name);
761+
// Combine & return
762+
let combined: Vec<&DiscoveredReaderData> = exernal_readers.chain(local_readers).collect();
763+
combined
764+
}
765+
736766
// // TODO: return iterator somehow?
737767
#[cfg(test)] // used only for testing
738768
pub fn get_local_topic_readers<T: TopicDescription>(

0 commit comments

Comments
 (0)