- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
- : 0 %
Source code
Revision control
Copy as Markdown
Other Tools
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Collecting a list of events relevant to whoever is using the Connection.
use std::{cell::RefCell, collections::VecDeque, rc::Rc};
use neqo_common::event::Provider as EventProvider;
use neqo_crypto::ResumptionToken;
use crate::{
AppError, Stats,
connection::State,
quic_datagrams::DatagramTracking,
stream_id::{StreamId, StreamType},
};
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum OutgoingDatagramOutcome {
DroppedTooBig,
DroppedQueueFull,
Lost,
Acked,
}
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum ConnectionEvent {
/// Cert authentication needed
AuthenticationNeeded,
/// Encrypted client hello fallback occurred. The certificate for the
/// public name needs to be authenticated.
EchFallbackAuthenticationNeeded {
public_name: String,
},
/// A new uni (read) or bidi stream has been opened by the peer.
NewStream {
stream_id: StreamId,
},
/// Space available in the buffer for an application write to succeed.
SendStreamWritable {
stream_id: StreamId,
},
/// New bytes available for reading.
RecvStreamReadable {
stream_id: StreamId,
},
/// Peer reset the stream.
RecvStreamReset {
stream_id: StreamId,
app_error: AppError,
},
/// Peer has sent `STOP_SENDING`
SendStreamStopSending {
stream_id: StreamId,
app_error: AppError,
},
/// Peer has acked everything sent on the stream.
SendStreamComplete {
stream_id: StreamId,
},
/// Peer increased `MAX_STREAMS`
SendStreamCreatable {
stream_type: StreamType,
},
/// Connection state change.
StateChange(State),
/// The server rejected 0-RTT.
/// This event invalidates all state in streams that has been created.
/// Any data written to streams needs to be written again.
ZeroRttRejected,
ResumptionToken(ResumptionToken),
Datagram(Vec<u8>),
OutgoingDatagramOutcome {
id: u64,
outcome: OutgoingDatagramOutcome,
},
IncomingDatagramDropped,
}
#[derive(Debug, Default, Clone)]
pub struct ConnectionEvents {
events: Rc<RefCell<VecDeque<ConnectionEvent>>>,
}
impl ConnectionEvents {
pub fn authentication_needed(&self) {
self.insert(ConnectionEvent::AuthenticationNeeded);
}
pub fn ech_fallback_authentication_needed(&self, public_name: String) {
self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name });
}
pub fn new_stream(&self, stream_id: StreamId) {
self.insert(ConnectionEvent::NewStream { stream_id });
}
pub fn recv_stream_readable(&self, stream_id: StreamId) {
self.insert(ConnectionEvent::RecvStreamReadable { stream_id });
}
pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) {
// If reset, no longer readable.
self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
self.insert(ConnectionEvent::RecvStreamReset {
stream_id,
app_error,
});
}
pub fn send_stream_writable(&self, stream_id: StreamId) {
self.insert(ConnectionEvent::SendStreamWritable { stream_id });
}
pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) {
// If stopped, no longer writable.
self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id));
self.insert(ConnectionEvent::SendStreamStopSending {
stream_id,
app_error,
});
}
pub fn send_stream_complete(&self, stream_id: StreamId) {
self.remove(|evt| {
matches!(evt,
ConnectionEvent::SendStreamWritable { stream_id: x } |
ConnectionEvent::SendStreamStopSending { stream_id: x, .. }
if *x == stream_id)
});
self.insert(ConnectionEvent::SendStreamComplete { stream_id });
}
pub fn send_stream_creatable(&self, stream_type: StreamType) {
self.insert(ConnectionEvent::SendStreamCreatable { stream_type });
}
pub fn connection_state_change(&self, state: State) {
// If closing, existing events no longer relevant.
match state {
State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(),
_ => (),
}
self.insert(ConnectionEvent::StateChange(state));
}
pub fn client_resumption_token(&self, token: ResumptionToken) {
self.insert(ConnectionEvent::ResumptionToken(token));
}
pub fn client_0rtt_rejected(&self) {
// If 0rtt rejected, must start over and existing events are no longer
// relevant.
self.events.borrow_mut().clear();
self.insert(ConnectionEvent::ZeroRttRejected);
}
pub fn recv_stream_complete(&self, stream_id: StreamId) {
// If stopped, no longer readable.
self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
}
// The number of datagrams in the events queue is limited to max_queued_datagrams.
// This function ensure this and deletes the oldest datagrams (head-drop) if needed.
fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) {
let mut queue = self.events.borrow_mut();
let count = queue
.iter()
.filter(|evt| matches!(evt, ConnectionEvent::Datagram(_)))
.count();
if count < max_queued_datagrams {
// Below the limit. No action needed.
return;
}
let first = queue
.iter_mut()
.find(|evt| matches!(evt, ConnectionEvent::Datagram(_)))
.expect("Checked above");
// Remove the oldest (head-drop), replacing it with an
// IncomingDatagramDropped placeholder.
*first = ConnectionEvent::IncomingDatagramDropped;
stats.incoming_datagram_dropped += 1;
}
pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) {
self.check_datagram_queued(max_queued_datagrams, stats);
self.events
.borrow_mut()
.push_back(ConnectionEvent::Datagram(data.to_vec()));
}
pub fn datagram_outcome(
&self,
dgram_tracker: &DatagramTracking,
outcome: OutgoingDatagramOutcome,
) {
if let DatagramTracking::Id(id) = dgram_tracker {
self.events
.borrow_mut()
.push_back(ConnectionEvent::OutgoingDatagramOutcome { id: *id, outcome });
}
}
fn insert(&self, event: ConnectionEvent) {
let mut q = self.events.borrow_mut();
// Special-case two enums that are not strictly PartialEq equal but that
// we wish to avoid inserting duplicates.
let already_present = match &event {
ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| {
matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. }
if *x == *stream_id)
}),
ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| {
matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. }
if *x == *stream_id)
}),
_ => q.contains(&event),
};
if !already_present {
q.push_back(event);
}
}
fn remove<F>(&self, f: F)
where
F: Fn(&ConnectionEvent) -> bool,
{
self.events.borrow_mut().retain(|evt| !f(evt));
}
}
impl EventProvider for ConnectionEvents {
type Event = ConnectionEvent;
fn has_events(&self) -> bool {
!self.events.borrow().is_empty()
}
fn next_event(&mut self) -> Option<Self::Event> {
self.events.borrow_mut().pop_front()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use neqo_common::event::Provider as _;
use crate::{CloseReason, ConnectionEvent, ConnectionEvents, Error, State, Stats, StreamId};
#[test]
fn event_culling() {
let mut evts = ConnectionEvents::default();
assert!(!evts.has_events());
evts.client_0rtt_rejected();
assert!(evts.has_events());
evts.client_0rtt_rejected();
assert_eq!(evts.events().count(), 1);
assert_eq!(evts.events().count(), 0);
evts.new_stream(4.into());
evts.new_stream(4.into());
assert_eq!(evts.events().count(), 1);
evts.recv_stream_readable(6.into());
evts.recv_stream_reset(6.into(), 66);
evts.recv_stream_reset(6.into(), 65);
assert_eq!(evts.events().count(), 1);
evts.send_stream_writable(8.into());
evts.send_stream_writable(8.into());
evts.send_stream_stop_sending(8.into(), 55);
evts.send_stream_stop_sending(8.into(), 56);
let events = evts.events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);
assert_eq!(
events[0],
ConnectionEvent::SendStreamStopSending {
stream_id: StreamId::new(8),
app_error: 55
}
);
evts.send_stream_writable(8.into());
evts.send_stream_writable(8.into());
evts.send_stream_stop_sending(8.into(), 55);
evts.send_stream_stop_sending(8.into(), 56);
evts.send_stream_complete(8.into());
assert_eq!(evts.events().count(), 1);
evts.send_stream_writable(8.into());
evts.send_stream_writable(9.into());
evts.send_stream_stop_sending(10.into(), 55);
evts.send_stream_stop_sending(11.into(), 56);
evts.send_stream_complete(12.into());
assert_eq!(evts.events().count(), 5);
evts.send_stream_writable(8.into());
evts.send_stream_writable(9.into());
evts.send_stream_stop_sending(10.into(), 55);
evts.send_stream_stop_sending(11.into(), 56);
evts.send_stream_complete(12.into());
evts.client_0rtt_rejected();
assert_eq!(evts.events().count(), 1);
evts.send_stream_writable(9.into());
evts.send_stream_stop_sending(10.into(), 55);
evts.connection_state_change(State::Closed(CloseReason::Transport(Error::StreamState)));
assert_eq!(evts.events().count(), 1);
}
#[test]
fn datagram_queue_drops_oldest() {
const MAX_QUEUED: usize = 2;
// Fill the queue to capacity, verify that and that there are no drops yet.
let e = ConnectionEvents::default();
let mut stats = Stats::default();
e.add_datagram(MAX_QUEUED, &[1], &mut stats);
e.add_datagram(MAX_QUEUED, &[2], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 0);
assert_eq!(e.events.borrow().len(), MAX_QUEUED);
// Add one more datagram - this should drop the oldest ("1").
e.add_datagram(MAX_QUEUED, &[3], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 1);
// Should have one `IncomingDatagramDropped` event + `MAX_QUEUED` datagrams.
assert_eq!(
e.events.borrow().iter().collect::<Vec<_>>(),
[
&ConnectionEvent::IncomingDatagramDropped,
&ConnectionEvent::Datagram(vec![2]),
&ConnectionEvent::Datagram(vec![3]),
]
);
}
/// Previously `check_datagram_queued` had a bug that caused it to
/// potentially drop an unrelated event.
///
/// See <https://github.com/mozilla/neqo/pull/3105> for details.
#[test]
fn datagram_queue_drops_datagram_not_unrelated_event() {
const MAX_QUEUED: usize = 2;
let e = ConnectionEvents::default();
let mut stats = Stats::default();
// Add unrelated event.
e.new_stream(4.into());
// Fill the queue with datagrams to capacity.
e.add_datagram(MAX_QUEUED, &[1], &mut stats);
e.add_datagram(MAX_QUEUED, &[2], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 0);
assert_eq!(e.events.borrow().len(), 1 + MAX_QUEUED);
// Add one more datagram - this should drop the oldest ("1"), not the
// unrelated event.
e.add_datagram(MAX_QUEUED, &[3], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 1);
// Should have one `IncomingDatagramDropped` event + `MAX_QUEUED` datagrams.
assert_eq!(
e.events.borrow().iter().collect::<Vec<_>>(),
[
&ConnectionEvent::NewStream {
stream_id: StreamId::new(4)
},
&ConnectionEvent::IncomingDatagramDropped,
&ConnectionEvent::Datagram(vec![2]),
&ConnectionEvent::Datagram(vec![3]),
]
);
}
}