Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ pub use self::linux::*;
#[cfg(target_os = "macos")]
use self::macos::*;

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Default)]
pub struct FullTimestampData {
pub hardware: Option<Timestamp>,
pub software: Option<Timestamp>,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Default)]
pub struct Timestamp {
pub seconds: i64,
Expand Down Expand Up @@ -100,6 +106,7 @@ pub struct RecvResult<A> {
pub remote_addr: A,
pub local_addr: A,
pub timestamp: Option<Timestamp>,
pub full_timestamp_data: FullTimestampData,
}

#[derive(Debug)]
Expand Down Expand Up @@ -132,6 +139,7 @@ impl<A: NetworkAddress, S> Socket<A, S> {
socket.receive_message(buf, &mut control_buf, MessageQueue::Normal)?;

let mut timestamp = None;
let mut full_timestamp_data = FullTimestampData::default();
let mut local_addr = self.local_addr;

// Loops through the control messages, but we should only get a single message
Expand All @@ -141,6 +149,12 @@ impl<A: NetworkAddress, S> Socket<A, S> {
ControlMessage::Timestamping { software, hardware } => {
tracing::trace!("Timestamps: {:?} {:?}", software, hardware);
timestamp = select_timestamp(self.timestamp_mode, software, hardware);

// Keep the first timestamp of each kind
full_timestamp_data.software =
full_timestamp_data.software.or(software);
full_timestamp_data.hardware =
full_timestamp_data.hardware.or(hardware);
}

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -175,14 +189,19 @@ impl<A: NetworkAddress, S> Socket<A, S> {
remote_addr,
local_addr,
timestamp,
full_timestamp_data,
})
})
.await
}
}

impl<A: NetworkAddress> Socket<A, Open> {
pub async fn send_to(&mut self, buf: &[u8], addr: A) -> std::io::Result<Option<Timestamp>> {
pub async fn send_to(
&mut self,
buf: &[u8],
addr: A,
) -> std::io::Result<(Option<Timestamp>, FullTimestampData)> {
let addr = addr.to_sockaddr(PrivateToken);

self.socket
Expand All @@ -205,7 +224,7 @@ impl<A: NetworkAddress> Socket<A, Open> {
unreachable!("Should not be able to create send timestamping sockets on platforms other than linux")
}
} else {
Ok(None)
Ok((None, FullTimestampData::default()))
}
}

Expand All @@ -214,7 +233,7 @@ impl<A: NetworkAddress> Socket<A, Open> {
buf: &[u8],
from: A,
to: A,
) -> std::io::Result<Option<Timestamp>> {
) -> std::io::Result<(Option<Timestamp>, FullTimestampData)> {
let from = from.to_sockaddr(PrivateToken);
let to = to.to_sockaddr(PrivateToken);

Expand All @@ -240,7 +259,7 @@ impl<A: NetworkAddress> Socket<A, Open> {
unreachable!("Should not be able to create send timestamping sockets on platforms other than linux")
}
} else {
Ok(None)
Ok((None, FullTimestampData::default()))
}
}

Expand All @@ -264,7 +283,10 @@ impl<A: NetworkAddress> Socket<A, Connected> {
A::from_sockaddr(addr, PrivateToken).ok_or_else(|| std::io::ErrorKind::Other.into())
}

pub async fn send(&mut self, buf: &[u8]) -> std::io::Result<Option<Timestamp>> {
pub async fn send(
&mut self,
buf: &[u8],
) -> std::io::Result<(Option<Timestamp>, FullTimestampData)> {
self.socket
.async_io(Interest::WRITABLE, |socket| socket.send(buf))
.await?;
Expand All @@ -285,11 +307,15 @@ impl<A: NetworkAddress> Socket<A, Connected> {
unreachable!("Should not be able to create send timestamping sockets on platforms other than linux")
}
} else {
Ok(None)
Ok((None, FullTimestampData::default()))
}
}

pub async fn send_from(&mut self, buf: &[u8], from: A) -> std::io::Result<Option<Timestamp>> {
pub async fn send_from(
&mut self,
buf: &[u8],
from: A,
) -> std::io::Result<(Option<Timestamp>, FullTimestampData)> {
let from = from.to_sockaddr(PrivateToken);
self.socket
.async_io(Interest::WRITABLE, |socket| socket.send_from(buf, from))
Expand All @@ -311,7 +337,7 @@ impl<A: NetworkAddress> Socket<A, Connected> {
unreachable!("Should not be able to create send timestamping sockets on platforms other than linux")
}
} else {
Ok(None)
Ok((None, FullTimestampData::default()))
}
}
}
Expand Down
19 changes: 11 additions & 8 deletions src/socket/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
interface::{lookup_phc, InterfaceName},
networkaddress::{sealed::PrivateToken, EthernetAddress, MacAddress, NetworkAddress},
raw_socket::RawSocket,
socket::select_timestamp,
socket::{select_timestamp, FullTimestampData},
};

use super::{InterfaceTimestampMode, Open, Socket, Timestamp};
Expand All @@ -21,28 +21,28 @@ impl<A: NetworkAddress, S> Socket<A, S> {
pub(super) async fn fetch_send_timestamp(
&self,
expected_counter: u32,
) -> std::io::Result<Option<Timestamp>> {
) -> std::io::Result<(Option<Timestamp>, FullTimestampData)> {
use std::time::Duration;

const TIMEOUT: Duration = Duration::from_millis(200);

match tokio::time::timeout(TIMEOUT, self.fetch_send_timestamp_loop(expected_counter)).await
{
Ok(res_opt_timestamp) => res_opt_timestamp,
Err(_timeout_elapsed) => Ok(None),
Err(_timeout_elapsed) => Ok((None, FullTimestampData::default())),
}
}

pub(super) async fn fetch_send_timestamp_loop(
&self,
expected_counter: u32,
) -> std::io::Result<Option<Timestamp>> {
) -> std::io::Result<(Option<Timestamp>, FullTimestampData)> {
let try_read = |_: &RawSocket| self.fetch_send_timestamp_try_read(expected_counter);

loop {
// the timestamp being available triggers the error interest
match self.socket.async_io(Interest::ERROR, try_read).await? {
Some(timestamp) => break Ok(Some(timestamp)),
Some((timestamp, timestamp_data)) => break Ok((Some(timestamp), timestamp_data)),
None => continue,
}
}
Expand All @@ -51,7 +51,7 @@ impl<A: NetworkAddress, S> Socket<A, S> {
pub(super) fn fetch_send_timestamp_try_read(
&self,
expected_counter: u32,
) -> std::io::Result<Option<Timestamp>> {
) -> std::io::Result<Option<(Timestamp, FullTimestampData)>> {
let mut control_buf = [0; EXPECTED_MAX_CMSG_SIZE];

// NOTE: this read could block!
Expand All @@ -65,7 +65,8 @@ impl<A: NetworkAddress, S> Socket<A, S> {
for msg in control_messages {
match msg {
ControlMessage::Timestamping { software, hardware } => {
send_ts = select_timestamp(self.timestamp_mode, software, hardware);
send_ts = select_timestamp(self.timestamp_mode, software, hardware)
.map(|v| (v, FullTimestampData { software, hardware }));
}

ControlMessage::ReceiveError(error) => {
Expand Down Expand Up @@ -124,6 +125,7 @@ pub(super) fn configure_timestamping(
let options = match mode {
InterfaceTimestampMode::HardwareAll | InterfaceTimestampMode::HardwarePTPAll => {
libc::SOF_TIMESTAMPING_RAW_HARDWARE
| libc::SOF_TIMESTAMPING_RX_SOFTWARE
| libc::SOF_TIMESTAMPING_TX_SOFTWARE
| libc::SOF_TIMESTAMPING_RX_HARDWARE
| libc::SOF_TIMESTAMPING_TX_HARDWARE
Expand All @@ -135,6 +137,7 @@ pub(super) fn configure_timestamping(
}
InterfaceTimestampMode::HardwareRecv | InterfaceTimestampMode::HardwarePTPRecv => {
libc::SOF_TIMESTAMPING_RAW_HARDWARE
| libc::SOF_TIMESTAMPING_RX_SOFTWARE
| libc::SOF_TIMESTAMPING_RX_HARDWARE
| bind_phc
.map(|_| SOF_TIMESTAMPING_BIND_PHC)
Expand Down Expand Up @@ -460,7 +463,7 @@ mod tests {
.unwrap();

let before = SystemTime::now();
let send_ts = b.send(&[1, 2, 3]).await.unwrap().unwrap();
let send_ts = b.send(&[1, 2, 3]).await.unwrap().0.unwrap();
let after = SystemTime::now();

let mut buf = [0; 4];
Expand Down
Loading