Source code

Revision control

Copy as Markdown

Other Tools

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::{
cell::RefCell,
fmt::{self, Display},
num::NonZeroUsize,
rc::Rc,
slice,
time::Instant,
};
use neqo_common::{Datagram, Header, header::HeadersExt as _, hex, qdebug, qerror, qinfo};
use neqo_crypto::{AntiReplay, generate_ech_keys, random};
use neqo_http3::{
Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId,
};
use neqo_transport::{ConnectionIdGenerator, OutputBatch, server::ValidateAddress};
use rustc_hash::FxHashMap as HashMap;
use super::{Args, qns_read_response};
use crate::send_data::{SendData, SendResult};
pub struct HttpServer {
server: Http3Server,
/// Progress writing to each stream.
remaining_data: HashMap<StreamId, SendData>,
/// Tracks POST requests: (bytes received, optional response size from path)
posts: HashMap<Http3OrWebTransportStream, (usize, Option<usize>)>,
is_qns_test: bool,
}
impl HttpServer {
/// Send a response on the given stream. If the stream was closed (e.g., by
/// `STOP_SENDING`), this logs the error and returns gracefully per RFC 9000.
fn send_response(
&mut self,
stream: &Http3OrWebTransportStream,
mut response: SendData,
now: Instant,
) {
if stream
.send_headers(&[
Header::new(":status", "200"),
Header::new("content-length", response.len().to_string()),
])
.is_err()
{
qerror!("Stream {stream} closed by peer, not sending response");
_ = stream.stream_reset_send(neqo_http3::Error::HttpNone.code());
return;
}
if let Some(remaining) = Self::send_response_body(stream, &mut response, now) {
self.remaining_data.insert(stream.stream_id(), remaining);
}
}
/// Send response body data. Returns `None` if done or stream closed,
/// `Some(data)` if more data remains to send.
fn send_response_body(
stream: &Http3OrWebTransportStream,
response: &mut SendData,
now: Instant,
) -> Option<SendData> {
match response.send(|chunk| stream.send_data(chunk, now)) {
SendResult::StreamClosed => {
qerror!("Stream {stream} closed");
_ = stream.stream_reset_send(neqo_http3::Error::HttpNone.code());
None
}
SendResult::Done => {
_ = stream.stream_close_send(now); // Stream may be closed; ignore errors.
None
}
SendResult::MoreData => Some(std::mem::take(response)),
}
}
pub fn new(
args: &Args,
anti_replay: AntiReplay,
cid_mgr: Rc<RefCell<dyn ConnectionIdGenerator>>,
) -> Self {
let mut server = Http3Server::new(
args.now(),
slice::from_ref(&args.key),
slice::from_ref(&args.shared.alpn),
anti_replay,
cid_mgr,
Http3Parameters::default()
.connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn))
.max_table_size_encoder(args.shared.max_table_size_encoder)
.max_table_size_decoder(args.shared.max_table_size_decoder)
.max_blocked_streams(args.shared.max_blocked_streams),
None,
)
.expect("We cannot make a server!");
server.set_ciphers(args.get_ciphers());
server.set_qlog_dir(args.shared.qlog_dir.clone());
if args.retry {
server.set_validation(ValidateAddress::Always);
}
if args.ech {
let (sk, pk) = generate_ech_keys().expect("should create ECH keys");
server
.enable_ech(random::<1>()[0], "public.example", &sk, &pk)
.unwrap();
qinfo!("ECHConfigList: {}", hex(server.ech_config()));
}
Self {
server,
remaining_data: HashMap::default(),
posts: HashMap::default(),
is_qns_test: args.shared.qns_test.is_some(),
}
}
}
impl Display for HttpServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.server.fmt(f)
}
}
impl super::HttpServer for HttpServer {
fn process_multiple<'a, D: IntoIterator<Item = Datagram<&'a mut [u8]>>>(
&mut self,
dgrams: D,
now: Instant,
max_datagrams: NonZeroUsize,
) -> OutputBatch {
self.server.process_multiple(dgrams, now, max_datagrams)
}
fn process_events(&mut self, _now: Instant) {
let now = Instant::now();
while let Some(event) = self.server.next_event() {
match event {
Http3ServerEvent::Headers {
stream,
headers,
fin,
} => {
qdebug!("Headers (request={stream} fin={fin}): {headers:?}");
if headers.contains_header(":method", b"POST") {
let response_size = headers.find_header(":path").and_then(|path| {
path.value_utf8()
.ok()?
.trim_matches('/')
.parse::<usize>()
.ok()
});
self.posts.insert(stream, (0, response_size));
continue;
}
let Some(path) = headers.find_header(":path") else {
_ = stream.cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()); // Stream may be closed; ignore errors.
continue;
};
let response = if self.is_qns_test {
let path_str = path.value_utf8().unwrap_or("/");
match qns_read_response(path_str) {
Ok(data) => SendData::from(data),
Err(e) => {
qerror!("Failed to read {path_str}: {e}");
// Stream may be closed; ignore errors.
if stream
.send_headers(&[Header::new(":status", "404")])
.is_ok()
{
_ = stream.stream_close_send(now);
} else {
_ = stream
.stream_reset_send(neqo_http3::Error::HttpNone.code());
}
continue;
}
}
} else if let Ok(path_str) = path.value_utf8() {
path_str
.trim_matches(|p| p == '/')
.parse::<usize>()
.map_or_else(|_| SendData::from(path.value()), SendData::zeroes)
} else {
SendData::from(path.value())
};
self.send_response(&stream, response, now);
}
Http3ServerEvent::DataWritable { stream } => {
if self.posts.get_mut(&stream).is_none()
&& let Some(mut remaining) = self.remaining_data.remove(&stream.stream_id())
&& let Some(data) = Self::send_response_body(&stream, &mut remaining, now)
{
self.remaining_data.insert(stream.stream_id(), data);
}
}
Http3ServerEvent::Data { stream, data, fin } => {
if let Some((received, _)) = self.posts.get_mut(&stream) {
*received += data.len();
}
if fin && let Some((received, response_size)) = self.posts.remove(&stream) {
let response = response_size.map_or_else(
|| SendData::from(received.to_string().into_bytes()),
SendData::zeroes,
);
self.send_response(&stream, response, now);
}
}
_ => {}
}
}
}
fn has_events(&self) -> bool {
self.server.has_events()
}
}