Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ references into your provided buffer:
- `PeerCert(&[u8])`: peer leaf certificate (DER) — validate in your app
- `KeyingMaterial(KeyingMaterial, SrtpProfile)`: DTLS‑SRTP export
- `ApplicationData(&[u8])`: plaintext received from peer
- `CloseNotify`: peer sent a `close_notify` alert (graceful shutdown)

## Example (Sans‑IO loop)

Expand Down Expand Up @@ -106,6 +107,10 @@ fn example_event_loop(mut dtls: Dtls) -> Result<(), dimpl::Error> {
Output::ApplicationData(_data) => {
// Deliver plaintext to application
}
Output::CloseNotify => {
// Peer initiated graceful shutdown
break;
}
_ => {}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/rust_crypto/kx_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl EcdhKeyExchange {
match group {
NamedGroup::X25519 => {
use rand_core::OsRng;
let secret = x25519_dalek::EphemeralSecret::random_from_rng(&mut OsRng);
let secret = x25519_dalek::EphemeralSecret::random_from_rng(OsRng);
let public_key_obj = x25519_dalek::PublicKey::from(&secret);
buf.clear();
buf.extend_from_slice(public_key_obj.as_bytes());
Expand Down
57 changes: 53 additions & 4 deletions src/dtls12/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub struct Client {

/// Data that is sent before we are connected.
queued_data: Vec<Buf>,

/// Whether we have already emitted a CloseNotify output event.
close_notify_reported: bool,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -106,6 +109,7 @@ impl Client {
last_now: now,
local_events: VecDeque::new(),
queued_data: Vec::new(),
close_notify_reported: false,
}
}

Expand Down Expand Up @@ -153,6 +157,7 @@ impl Client {
last_now: now,
local_events: VecDeque::new(),
queued_data: Vec::new(),
close_notify_reported: false,
};
client.handle_timeout(now)?;
Ok(client)
Expand All @@ -173,13 +178,18 @@ impl Client {
}

pub fn poll_output<'a>(&mut self, buf: &'a mut [u8]) -> Output<'a> {
let last_now = self.last_now;

if let Some(event) = self.local_events.pop_front() {
return event.into_output(buf, &self.server_certificates);
}

self.engine.poll_output(buf, last_now)
let output = self.engine.poll_output(buf, self.last_now);
if matches!(output, Output::Timeout(_))
&& self.engine.close_notify_received()
&& !self.close_notify_reported
{
self.close_notify_reported = true;
return Output::CloseNotify;
}
output
}

/// Explicitly start the handshake process by sending a ClientHello
Expand All @@ -198,6 +208,10 @@ impl Client {
/// This should only be called when the client is in the Running state,
/// after the handshake is complete.
pub fn send_application_data(&mut self, data: &[u8]) -> Result<(), Error> {
if self.state == State::Closed {
return Err(Error::ConnectionClosed);
}

if self.state != State::AwaitApplicationData {
self.queued_data.push(data.to_buf());
return Ok(());
Expand All @@ -213,6 +227,25 @@ impl Client {
Ok(())
}

/// Initiate graceful shutdown by sending a `close_notify` alert.
pub fn close(&mut self) -> Result<(), Error> {
if self.state == State::Closed {
return Ok(());
}
if self.state != State::AwaitApplicationData {
self.engine.abort();
self.state = State::Closed;
return Ok(());
}
self.engine
.create_record(ContentType::Alert, 1, false, |body| {
body.push(1); // level: warning
body.push(0); // description: close_notify
})?;
self.state = State::Closed;
Ok(())
}

fn make_progress(&mut self) -> Result<(), Error> {
loop {
let prev_state = self.state;
Expand Down Expand Up @@ -247,6 +280,7 @@ enum State {
AwaitNewSessionTicket,
AwaitFinished,
AwaitApplicationData,
Closed,
}

impl State {
Expand All @@ -268,6 +302,7 @@ impl State {
State::AwaitNewSessionTicket => "AwaitNewSessionTicket",
State::AwaitFinished => "AwaitFinished",
State::AwaitApplicationData => "AwaitApplicationData",
State::Closed => "Closed",
}
}

Expand All @@ -289,6 +324,7 @@ impl State {
State::AwaitNewSessionTicket => self.await_new_session_ticket(client),
State::AwaitFinished => self.await_finished(client),
State::AwaitApplicationData => self.await_application_data(client),
State::Closed => Ok(self),
}
}

Expand Down Expand Up @@ -1051,6 +1087,19 @@ impl State {
}

fn await_application_data(self, client: &mut Client) -> Result<Self, Error> {
if client.engine.close_notify_received() {
// RFC 5246 §7.2.1: respond with a reciprocal close_notify and
// close down immediately, discarding any pending writes.
client.engine.discard_pending_writes();
client
.engine
.create_record(ContentType::Alert, 1, false, |body| {
body.push(1); // level: warning
body.push(0); // description: close_notify
})?;
return Ok(State::Closed);
}

if !client.queued_data.is_empty() {
debug!(
"Sending queued application data: {}",
Expand Down
115 changes: 112 additions & 3 deletions src/dtls12/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, Instant};

use arrayvec::ArrayVec;

use super::queue::{QueueRx, QueueTx};
use crate::buffer::{Buf, BufferPool, TmpBuf};
use crate::crypto::{Aad, Iv, Nonce};
Expand Down Expand Up @@ -88,6 +90,9 @@ pub struct Engine {

/// Whether we are ready to release application data from poll_output.
release_app_data: bool,

/// Whether a close_notify alert has been received from the peer.
close_notify_received: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -140,6 +145,7 @@ impl Engine {
flight_timeout: Timeout::Unarmed,
connect_timeout: Timeout::Unarmed,
release_app_data: false,
close_notify_received: false,
}
}

Expand Down Expand Up @@ -206,11 +212,12 @@ impl Engine {

/// Insert the Incoming using the logic:
///
/// 1. If it is a handshake, sort by the message_seq
/// 2. If it is not a handshake, sort by sequence_number
/// 1. Extract alert records for immediate processing.
/// 2. If it is a handshake, sort by the message_seq
/// 3. If it is not a handshake, sort by sequence_number
///
fn insert_incoming(&mut self, incoming: Incoming) -> Result<(), Error> {
// Capacity guard
// Capacity guard before iterating records.
if self.queue_rx.len() >= self.config.max_queue_rx() {
warn!(
"Receive queue full (max {}): {:?}",
Expand All @@ -220,6 +227,10 @@ impl Engine {
return Err(Error::ReceiveQueueFull);
}

let Some(incoming) = self.extract_alerts(incoming)? else {
return Ok(());
};

// Dispatch to specialized handlers
if incoming.first().first_handshake().is_some() {
self.insert_incoming_handshake(incoming)
Expand All @@ -228,6 +239,83 @@ impl Engine {
}
}

/// Process alert records from the incoming datagram, returning the
/// remaining non-alert records for queuing.
///
/// A single UDP datagram can contain mixed record types, so each
/// record is inspected individually. Alert records are handled
/// per-epoch for authentication, and after receiving close_notify,
/// any further ApplicationData is discarded.
fn extract_alerts(&mut self, incoming: Incoming) -> Result<Option<Incoming>, Error> {
let mut remaining = ArrayVec::new();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal throughout dimpl is to avoid extra buffering like this if we can. I.e. here we "pre-process" all the incoming records into a new temporary field.

I'm also in two minds whether this really belongs in a function labelled "insert_incoming" (because it's more like "preprocess").

The code almost feels more related to the decryption pass we do in incoming.rs via the RecordDecrypt trait.

What do you think? Would it sit more naturally in incoming.rs?

for record in incoming.into_records() {
if record.record().content_type == ContentType::Alert {
let epoch = record.record().sequence.epoch;
if epoch == 0 {
if self.peer_encryption_enabled {
// Post-handshake: epoch 0 alerts are unauthenticated, discard
self.buffers_free.push(record.into_buffer());
continue;
}
// During handshake: accept fatal alerts (level==2)
let fragment = record.record().fragment(record.buffer());
if fragment.len() >= 2 && fragment[0] == 2 {
let description = fragment[1];
self.buffers_free.push(record.into_buffer());
return Err(Error::SecurityError(format!(
"Received fatal alert: level=2, description={}",
description
)));
}
// Non-fatal epoch 0 alert during handshake: discard
self.buffers_free.push(record.into_buffer());
continue;
}
if !self.peer_encryption_enabled {
// Epoch ≥ 1 but peer encryption not yet enabled: keep for
// re-parsing after enable_peer_encryption (ciphertext record).
remaining.try_push(record).ok();
continue;
}
// Authenticated alert (epoch ≥ 1, peer encryption enabled)
let fragment = record.record().fragment(record.buffer());
if fragment.len() >= 2 {
let level = fragment[0];
let description = fragment[1];
if description == 0 {
// close_notify: signal graceful shutdown
self.close_notify_received = true;
self.buffers_free.push(record.into_buffer());
continue;
} else if level == 2 {
// Fatal alert (non close_notify)
self.buffers_free.push(record.into_buffer());
return Err(Error::SecurityError(format!(
"Received fatal alert: level={}, description={}",
level, description
)));
}
}
// Warning alerts with non-zero description: discard
self.buffers_free.push(record.into_buffer());
continue;
}

// After close_notify (from this or a prior datagram), discard
// any further ApplicationData — the read half is closed.
if self.close_notify_received
&& record.record().content_type == ContentType::ApplicationData
{
self.buffers_free.push(record.into_buffer());
continue;
}

remaining.try_push(record).ok();
}

Ok(Incoming::from_records(remaining))
}

fn insert_incoming_handshake(&mut self, incoming: Incoming) -> Result<(), Error> {
let first_record = incoming.first();
let handshake = first_record
Expand Down Expand Up @@ -899,6 +987,27 @@ impl Engine {
self.release_app_data = true;
}

/// Whether a close_notify alert has been received from the peer.
pub fn close_notify_received(&self) -> bool {
self.close_notify_received
}

/// Discard all pending outgoing data.
///
/// RFC 5246 §7.2.1: on receiving close_notify, discard any pending writes.
pub fn discard_pending_writes(&mut self) {
self.queue_tx.clear();
}

/// Abort the connection: flush all queued output, retransmission state, and
/// disable timers so that no further packets are emitted.
pub fn abort(&mut self) {
self.queue_tx.clear();
self.flight_saved_records.clear();
self.flight_timeout = Timeout::Disabled;
self.connect_timeout = Timeout::Disabled;
}

/// Pop a buffer from the buffer pool for temporary use
pub(crate) fn pop_buffer(&mut self) -> Buf {
self.buffers_free.pop()
Expand Down
11 changes: 11 additions & 0 deletions src/dtls12/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ impl Incoming {
pub fn into_records(self) -> impl Iterator<Item = Record> {
self.records.records.into_iter()
}

/// Create an Incoming from pre-filtered records.
/// Returns None if records is empty (same invariant as parse_packet).
pub fn from_records(records: ArrayVec<Record, 8>) -> Option<Self> {
if records.is_empty() {
return None;
}
Some(Incoming {
records: Box::new(Records { records }),
})
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this doesn't feel great, recreating the incoming like this. Related to previous comment.

}

impl Incoming {
Expand Down
Loading
Loading