From 0102a4b741655d7f6fd64110ceda5177580b3952 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sun, 1 Jan 2023 21:59:13 +0100 Subject: [PATCH] phy: make consume infallible, move timestamp to receive/transmit. --- examples/tcpdump.rs | 17 ++- src/iface/interface/ethernet.rs | 2 +- src/iface/interface/ipv4.rs | 2 +- src/iface/interface/mod.rs | 30 +++--- src/iface/interface/sixlowpan.rs | 34 +++--- src/iface/interface/tests.rs | 31 +++--- src/phy/fault_injector.rs | 173 +++++++++++++------------------ src/phy/fuzz_injector.rs | 21 ++-- src/phy/loopback.rs | 13 ++- src/phy/mod.rs | 37 ++++--- src/phy/pcap_writer.rs | 79 +++++++------- src/phy/raw_socket.rs | 20 ++-- src/phy/tracer.rs | 52 ++++------ src/phy/tuntap_interface.rs | 21 ++-- 14 files changed, 244 insertions(+), 288 deletions(-) diff --git a/examples/tcpdump.rs b/examples/tcpdump.rs index 6bdf3597..2baf376e 100644 --- a/examples/tcpdump.rs +++ b/examples/tcpdump.rs @@ -10,15 +10,12 @@ fn main() { let mut socket = RawSocket::new(ifname.as_ref(), smoltcp::phy::Medium::Ethernet).unwrap(); loop { phy_wait(socket.as_raw_fd(), None).unwrap(); - let (rx_token, _) = socket.receive().unwrap(); - rx_token - .consume(Instant::now(), |buffer| { - println!( - "{}", - PrettyPrinter::>::new("", &buffer) - ); - Ok(()) - }) - .unwrap(); + let (rx_token, _) = socket.receive(Instant::now()).unwrap(); + rx_token.consume(|buffer| { + println!( + "{}", + PrettyPrinter::>::new("", &buffer) + ); + }) } } diff --git a/src/iface/interface/ethernet.rs b/src/iface/interface/ethernet.rs index b633c0a4..1e67522d 100644 --- a/src/iface/interface/ethernet.rs +++ b/src/iface/interface/ethernet.rs @@ -69,7 +69,7 @@ impl<'i> InterfaceInner<'i> { F: FnOnce(EthernetFrame<&mut [u8]>), { let tx_len = EthernetFrame::<&[u8]>::buffer_len(buffer_len); - tx_token.consume(self.now, tx_len, |tx_buffer| { + tx_token.consume(tx_len, |tx_buffer| { debug_assert!(tx_buffer.as_ref().len() == tx_len); let mut frame = EthernetFrame::new_unchecked(tx_buffer); diff --git a/src/iface/interface/ipv4.rs b/src/iface/interface/ipv4.rs index fcbde474..0e69df73 100644 --- a/src/iface/interface/ipv4.rs +++ b/src/iface/interface/ipv4.rs @@ -469,7 +469,7 @@ impl<'a> InterfaceInner<'a> { Ok(()) }; - tx_token.consume(self.now, tx_len, |mut tx_buffer| { + tx_token.consume(tx_len, |mut tx_buffer| { #[cfg(feature = "medium-ethernet")] if matches!(self.caps.medium, Medium::Ethernet) { emit_ethernet(&IpRepr::Ipv4(*repr), tx_buffer)?; diff --git a/src/iface/interface/mod.rs b/src/iface/interface/mod.rs index 8eec9259..a4fc50fc 100644 --- a/src/iface/interface/mod.rs +++ b/src/iface/interface/mod.rs @@ -927,7 +927,7 @@ impl<'a> Interface<'a> { } else if let Some(pkt) = self.inner.igmp_report_packet(IgmpVersion::Version2, addr) { // Send initial membership report - let tx_token = device.transmit().ok_or(Error::Exhausted)?; + let tx_token = device.transmit(timestamp).ok_or(Error::Exhausted)?; self.inner.dispatch_ip(tx_token, pkt, None)?; Ok(true) } else { @@ -963,7 +963,7 @@ impl<'a> Interface<'a> { Ok(false) } else if let Some(pkt) = self.inner.igmp_leave_packet(addr) { // Send group leave packet - let tx_token = device.transmit().ok_or(Error::Exhausted)?; + let tx_token = device.transmit(timestamp).ok_or(Error::Exhausted)?; self.inner.dispatch_ip(tx_token, pkt, None)?; Ok(true) } else { @@ -1161,8 +1161,8 @@ impl<'a> Interface<'a> { out_packets: _out_packets, } = self; - while let Some((rx_token, tx_token)) = device.receive() { - let res = rx_token.consume(inner.now, |frame| { + while let Some((rx_token, tx_token)) = device.receive(inner.now) { + rx_token.consume(|frame| { match inner.caps.medium { #[cfg(feature = "medium-ethernet")] Medium::Ethernet => { @@ -1195,12 +1195,7 @@ impl<'a> Interface<'a> { } } processed_any = true; - Ok(()) }); - - if let Err(err) = res { - net_debug!("Failed to consume RX token: {}", err); - } } processed_any @@ -1229,7 +1224,7 @@ impl<'a> Interface<'a> { let mut neighbor_addr = None; let mut respond = |inner: &mut InterfaceInner, response: IpPacket| { neighbor_addr = Some(response.ip_repr().dst_addr()); - let t = device.transmit().ok_or_else(|| { + let t = device.transmit(inner.now).ok_or_else(|| { net_debug!("failed to transmit IP: {}", Error::Exhausted); Error::Exhausted })?; @@ -1328,7 +1323,7 @@ impl<'a> Interface<'a> { } if self.inner.now >= timeout => { if let Some(pkt) = self.inner.igmp_report_packet(version, group) { // Send initial membership report - let tx_token = device.transmit().ok_or(Error::Exhausted)?; + let tx_token = device.transmit(self.inner.now).ok_or(Error::Exhausted)?; self.inner.dispatch_ip(tx_token, pkt, None)?; } @@ -1352,7 +1347,8 @@ impl<'a> Interface<'a> { Some(addr) => { if let Some(pkt) = self.inner.igmp_report_packet(version, addr) { // Send initial membership report - let tx_token = device.transmit().ok_or(Error::Exhausted)?; + let tx_token = + device.transmit(self.inner.now).ok_or(Error::Exhausted)?; self.inner.dispatch_ip(tx_token, pkt, None)?; } @@ -1402,7 +1398,7 @@ impl<'a> Interface<'a> { } = &self.out_packets.ipv4_out_packet; if *packet_len > *sent_bytes { - match device.transmit() { + match device.transmit(self.inner.now) { Some(tx_token) => self .inner .dispatch_ipv4_out_packet(tx_token, &mut self.out_packets.ipv4_out_packet), @@ -1440,7 +1436,7 @@ impl<'a> Interface<'a> { } = &self.out_packets.sixlowpan_out_packet; if *packet_len > *sent_bytes { - match device.transmit() { + match device.transmit(self.inner.now) { Some(tx_token) => self.inner.dispatch_ieee802154_out_packet( tx_token, &mut self.out_packets.sixlowpan_out_packet, @@ -2246,7 +2242,7 @@ impl<'a> InterfaceInner<'a> { } // Transmit the first packet. - tx_token.consume(self.now, tx_len, |mut tx_buffer| { + tx_token.consume(tx_len, |mut tx_buffer| { #[cfg(feature = "medium-ethernet")] if matches!(self.caps.medium, Medium::Ethernet) { emit_ethernet(&ip_repr, tx_buffer)?; @@ -2271,7 +2267,7 @@ impl<'a> InterfaceInner<'a> { } } else { // No fragmentation is required. - tx_token.consume(self.now, total_len, |mut tx_buffer| { + tx_token.consume(total_len, |mut tx_buffer| { #[cfg(feature = "medium-ethernet")] if matches!(self.caps.medium, Medium::Ethernet) { emit_ethernet(&ip_repr, tx_buffer)?; @@ -2285,7 +2281,7 @@ impl<'a> InterfaceInner<'a> { } // We don't support IPv6 fragmentation yet. #[cfg(feature = "proto-ipv6")] - IpRepr::Ipv6(_) => tx_token.consume(self.now, total_len, |mut tx_buffer| { + IpRepr::Ipv6(_) => tx_token.consume(total_len, |mut tx_buffer| { #[cfg(feature = "medium-ethernet")] if matches!(self.caps.medium, Medium::Ethernet) { emit_ethernet(&ip_repr, tx_buffer)?; diff --git a/src/iface/interface/sixlowpan.rs b/src/iface/interface/sixlowpan.rs index 84dacecb..8c6f8a35 100644 --- a/src/iface/interface/sixlowpan.rs +++ b/src/iface/interface/sixlowpan.rs @@ -488,27 +488,22 @@ impl<'a> InterfaceInner<'a> { *sent_bytes = frag1_size; *datagram_offset = frag1_size + header_diff; - tx_token.consume( - self.now, - ieee_len + frag1.buffer_len() + frag1_size, - |mut tx_buf| { - // Add the IEEE header. - let mut ieee_packet = - Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]); - ieee_repr.emit(&mut ieee_packet); - tx_buf = &mut tx_buf[ieee_len..]; + tx_token.consume(ieee_len + frag1.buffer_len() + frag1_size, |mut tx_buf| { + // Add the IEEE header. + let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]); + ieee_repr.emit(&mut ieee_packet); + tx_buf = &mut tx_buf[ieee_len..]; - // Add the first fragment header - let mut frag1_packet = SixlowpanFragPacket::new_unchecked(&mut tx_buf); - frag1.emit(&mut frag1_packet); - tx_buf = &mut tx_buf[frag1.buffer_len()..]; + // Add the first fragment header + let mut frag1_packet = SixlowpanFragPacket::new_unchecked(&mut tx_buf); + frag1.emit(&mut frag1_packet); + tx_buf = &mut tx_buf[frag1.buffer_len()..]; - // Add the buffer part. - tx_buf[..frag1_size].copy_from_slice(&buffer[..frag1_size]); + // Add the buffer part. + tx_buf[..frag1_size].copy_from_slice(&buffer[..frag1_size]); - Ok(()) - }, - ) + Ok(()) + }) } #[cfg(not(feature = "proto-sixlowpan-fragmentation"))] @@ -520,7 +515,7 @@ impl<'a> InterfaceInner<'a> { } } else { // We don't need fragmentation, so we emit everything to the TX token. - tx_token.consume(self.now, total_size + ieee_len, |mut tx_buf| { + tx_token.consume(total_size + ieee_len, |mut tx_buf| { let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]); ieee_repr.emit(&mut ieee_packet); tx_buf = &mut tx_buf[ieee_len..]; @@ -623,7 +618,6 @@ impl<'a> InterfaceInner<'a> { let frag_size = (*packet_len - *sent_bytes).min(*fragn_size); tx_token.consume( - self.now, ieee_repr.buffer_len() + fragn.buffer_len() + frag_size, |mut tx_buf| { let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]); diff --git a/src/iface/interface/tests.rs b/src/iface/interface/tests.rs index 19b8647e..32dde588 100644 --- a/src/iface/interface/tests.rs +++ b/src/iface/interface/tests.rs @@ -9,7 +9,6 @@ use crate::iface::NeighborCache; use crate::phy::{ChecksumCapabilities, Loopback}; #[cfg(feature = "proto-igmp")] use crate::time::Instant; -use crate::{Error, Result}; #[allow(unused)] fn fill_slice(s: &mut [u8], val: u8) { @@ -129,12 +128,10 @@ fn create_ieee802154<'a>() -> (Interface<'a>, SocketSet<'a>, Loopback) { #[cfg(feature = "proto-igmp")] fn recv_all(device: &mut Loopback, timestamp: Instant) -> Vec> { let mut pkts = Vec::new(); - while let Some((rx, _tx)) = device.receive() { - rx.consume(timestamp, |pkt| { + while let Some((rx, _tx)) = device.receive(timestamp) { + rx.consume(|pkt| { pkts.push(pkt.to_vec()); - Ok(()) - }) - .unwrap(); + }); } pkts } @@ -144,11 +141,12 @@ fn recv_all(device: &mut Loopback, timestamp: Instant) -> Vec> { struct MockTxToken; impl TxToken for MockTxToken { - fn consume(self, _: Instant, _: usize, _: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { - Err(Error::Unaddressable) + let mut junk = [0; 1536]; + f(&mut junk[..len]) } } @@ -1206,13 +1204,10 @@ fn test_handle_igmp() { ]; { // Transmit GENERAL_QUERY_BYTES into loopback - let tx_token = device.transmit().unwrap(); - tx_token - .consume(timestamp, GENERAL_QUERY_BYTES.len(), |buffer| { - buffer.copy_from_slice(GENERAL_QUERY_BYTES); - Ok(()) - }) - .unwrap(); + let tx_token = device.transmit(timestamp).unwrap(); + tx_token.consume(GENERAL_QUERY_BYTES.len(), |buffer| { + buffer.copy_from_slice(GENERAL_QUERY_BYTES); + }); } // Trigger processing until all packets received through the // loopback have been processed, including responses to @@ -1562,7 +1557,7 @@ fn test_echo_request_sixlowpan_128_bytes() { Instant::now(), ); - let tx_token = device.transmit().unwrap(); + let tx_token = device.transmit(Instant::now()).unwrap(); iface .inner .dispatch_ieee802154( @@ -1702,7 +1697,7 @@ fn test_sixlowpan_udp_with_fragmentation() { )) ); - let tx_token = device.transmit().unwrap(); + let tx_token = device.transmit(Instant::now()).unwrap(); iface .inner .dispatch_ieee802154( diff --git a/src/phy/fault_injector.rs b/src/phy/fault_injector.rs index d5e22c48..64d21714 100644 --- a/src/phy/fault_injector.rs +++ b/src/phy/fault_injector.rs @@ -1,8 +1,5 @@ -use core::cell::RefCell; - use crate::phy::{self, Device, DeviceCapabilities}; use crate::time::{Duration, Instant}; -use crate::{Error, Result}; // We use our own RNG to stay compatible with #![no_std]. // The use of the RNG below has a slight bias, but it doesn't matter. @@ -96,23 +93,24 @@ impl State { #[derive(Debug)] pub struct FaultInjector { inner: D, - state: RefCell, + state: State, config: Config, + rx_buf: [u8; MTU], } impl FaultInjector { /// Create a fault injector device, using the given random number generator seed. pub fn new(inner: D, seed: u32) -> FaultInjector { - let state = State { - rng_seed: seed, - refilled_at: Instant::from_millis(0), - tx_bucket: 0, - rx_bucket: 0, - }; FaultInjector { inner, - state: RefCell::new(state), + state: State { + rng_seed: seed, + refilled_at: Instant::from_millis(0), + tx_bucket: 0, + rx_bucket: 0, + }, config: Config::default(), + rx_buf: [0u8; MTU], } } @@ -190,13 +188,13 @@ impl FaultInjector { /// Set the interval for packet rate limiting, in milliseconds. pub fn set_bucket_interval(&mut self, interval: Duration) { - self.state.borrow_mut().refilled_at = Instant::from_millis(0); + self.state.refilled_at = Instant::from_millis(0); self.config.interval = interval } } impl Device for FaultInjector { - type RxToken<'a> = RxToken<'a, D::RxToken<'a>> + type RxToken<'a> = RxToken<'a> where Self: 'a; type TxToken<'a> = TxToken<'a, D::TxToken<'a>> @@ -211,117 +209,94 @@ impl Device for FaultInjector { caps } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { - let &mut Self { - ref mut inner, - ref state, - config, - } = self; - inner.receive().map(|(rx_token, tx_token)| { - let rx = RxToken { - state, - config, - token: rx_token, - corrupt: [0; MTU], - }; - let tx = TxToken { - state, - config, - token: tx_token, - junk: [0; MTU], - }; - (rx, tx) - }) + fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + let (rx_token, tx_token) = self.inner.receive(timestamp)?; + + let len = super::RxToken::consume(rx_token, |buffer| { + if (self.config.max_size > 0 && buffer.len() > self.config.max_size) + || buffer.len() > self.rx_buf.len() + { + net_trace!("rx: dropping a packet that is too large"); + return None; + } + self.rx_buf[..buffer.len()].copy_from_slice(buffer); + Some(buffer.len()) + })?; + + let buf = &mut self.rx_buf[..len]; + + if self.state.maybe(self.config.drop_pct) { + net_trace!("rx: randomly dropping a packet"); + return None; + } + + if !self.state.maybe_receive(&self.config, timestamp) { + net_trace!("rx: dropping a packet because of rate limiting"); + return None; + } + + if self.state.maybe(self.config.corrupt_pct) { + net_trace!("rx: randomly corrupting a packet"); + self.state.corrupt(&mut buf[..]); + } + + let rx = RxToken { buf }; + let tx = TxToken { + state: &mut self.state, + config: self.config, + token: tx_token, + junk: [0; MTU], + timestamp, + }; + Some((rx, tx)) } - fn transmit(&mut self) -> Option> { - let &mut Self { - ref mut inner, - ref state, - config, - } = self; - inner.transmit().map(|token| TxToken { - state, - config, + fn transmit(&mut self, timestamp: Instant) -> Option> { + self.inner.transmit(timestamp).map(|token| TxToken { + state: &mut self.state, + config: self.config, token, junk: [0; MTU], + timestamp, }) } } #[doc(hidden)] -pub struct RxToken<'a, Rx: phy::RxToken> { - state: &'a RefCell, - config: Config, - token: Rx, - corrupt: [u8; MTU], +pub struct RxToken<'a> { + buf: &'a mut [u8], } -impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> { - fn consume(self, timestamp: Instant, f: F) -> Result +impl<'a> phy::RxToken for RxToken<'a> { + fn consume(self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { - if self.state.borrow_mut().maybe(self.config.drop_pct) { - net_trace!("rx: randomly dropping a packet"); - return Err(Error::Exhausted); - } - if !self - .state - .borrow_mut() - .maybe_receive(&self.config, timestamp) - { - net_trace!("rx: dropping a packet because of rate limiting"); - return Err(Error::Exhausted); - } - let Self { - token, - config, - state, - mut corrupt, - } = self; - token.consume(timestamp, |buffer| { - if config.max_size > 0 && buffer.as_ref().len() > config.max_size { - net_trace!("rx: dropping a packet that is too large"); - return Err(Error::Exhausted); - } - if state.borrow_mut().maybe(config.corrupt_pct) { - net_trace!("rx: randomly corrupting a packet"); - let mut corrupt = &mut corrupt[..buffer.len()]; - corrupt.copy_from_slice(buffer); - state.borrow_mut().corrupt(&mut corrupt); - f(corrupt) - } else { - f(buffer) - } - }) + f(self.buf) } } #[doc(hidden)] pub struct TxToken<'a, Tx: phy::TxToken> { - state: &'a RefCell, + state: &'a mut State, config: Config, token: Tx, junk: [u8; MTU], + timestamp: Instant, } impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> { - fn consume(mut self, timestamp: Instant, len: usize, f: F) -> Result + fn consume(mut self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { - let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) { + let drop = if self.state.maybe(self.config.drop_pct) { net_trace!("tx: randomly dropping a packet"); true } else if self.config.max_size > 0 && len > self.config.max_size { net_trace!("tx: dropping a packet that is too large"); true - } else if !self - .state - .borrow_mut() - .maybe_transmit(&self.config, timestamp) - { + } else if !self.state.maybe_transmit(&self.config, self.timestamp) { net_trace!("tx: dropping a packet because of rate limiting"); true } else { @@ -332,16 +307,10 @@ impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> { return f(&mut self.junk[..len]); } - let Self { - token, - state, - config, - .. - } = self; - token.consume(timestamp, len, |mut buf| { - if state.borrow_mut().maybe(config.corrupt_pct) { + self.token.consume(len, |mut buf| { + if self.state.maybe(self.config.corrupt_pct) { net_trace!("tx: corrupting a packet"); - state.borrow_mut().corrupt(&mut buf) + self.state.corrupt(&mut buf) } f(buf) }) diff --git a/src/phy/fuzz_injector.rs b/src/phy/fuzz_injector.rs index 02551795..54bc0b9a 100644 --- a/src/phy/fuzz_injector.rs +++ b/src/phy/fuzz_injector.rs @@ -1,6 +1,5 @@ use crate::phy::{self, Device, DeviceCapabilities}; use crate::time::Instant; -use crate::Result; // This could be fixed once associated consts are stable. const MTU: usize = 1536; @@ -62,13 +61,13 @@ where caps } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { let &mut Self { ref mut inner, ref fuzz_rx, ref fuzz_tx, } = self; - inner.receive().map(|(rx_token, tx_token)| { + inner.receive(timestamp).map(|(rx_token, tx_token)| { let rx = RxToken { fuzzer: fuzz_rx, token: rx_token, @@ -81,13 +80,13 @@ where }) } - fn transmit(&mut self) -> Option> { + fn transmit(&mut self, timestamp: Instant) -> Option> { let &mut Self { ref mut inner, fuzz_rx: _, ref fuzz_tx, } = self; - inner.transmit().map(|token| TxToken { + inner.transmit(timestamp).map(|token| TxToken { fuzzer: fuzz_tx, token: token, }) @@ -101,12 +100,12 @@ pub struct RxToken<'a, Rx: phy::RxToken, F: Fuzzer + 'a> { } impl<'a, Rx: phy::RxToken, FRx: Fuzzer> phy::RxToken for RxToken<'a, Rx, FRx> { - fn consume(self, timestamp: Instant, f: F) -> Result + fn consume(self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { let Self { fuzzer, token } = self; - token.consume(timestamp, |buffer| { + token.consume(|buffer| { fuzzer.fuzz_packet(buffer); f(buffer) }) @@ -120,12 +119,12 @@ pub struct TxToken<'a, Tx: phy::TxToken, F: Fuzzer + 'a> { } impl<'a, Tx: phy::TxToken, FTx: Fuzzer> phy::TxToken for TxToken<'a, Tx, FTx> { - fn consume(self, timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { let Self { fuzzer, token } = self; - token.consume(timestamp, len, |buf| { + token.consume(len, |buf| { let result = f(buf); fuzzer.fuzz_packet(buf); result diff --git a/src/phy/loopback.rs b/src/phy/loopback.rs index 5bf03541..1f57c0ca 100644 --- a/src/phy/loopback.rs +++ b/src/phy/loopback.rs @@ -3,7 +3,6 @@ use alloc::vec::Vec; use crate::phy::{self, Device, DeviceCapabilities, Medium}; use crate::time::Instant; -use crate::Result; /// A loopback device. #[derive(Debug)] @@ -38,7 +37,7 @@ impl Device for Loopback { } } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { self.queue.pop_front().map(move |buffer| { let rx = RxToken { buffer }; let tx = TxToken { @@ -48,7 +47,7 @@ impl Device for Loopback { }) } - fn transmit(&mut self) -> Option> { + fn transmit(&mut self, _timestamp: Instant) -> Option> { Some(TxToken { queue: &mut self.queue, }) @@ -61,9 +60,9 @@ pub struct RxToken { } impl phy::RxToken for RxToken { - fn consume(mut self, _timestamp: Instant, f: F) -> Result + fn consume(mut self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { f(&mut self.buffer) } @@ -76,9 +75,9 @@ pub struct TxToken<'a> { } impl<'a> phy::TxToken for TxToken<'a> { - fn consume(self, _timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { let mut buffer = Vec::new(); buffer.resize(len, 0); diff --git a/src/phy/mod.rs b/src/phy/mod.rs index 54b109da..018c0eac 100644 --- a/src/phy/mod.rs +++ b/src/phy/mod.rs @@ -42,12 +42,12 @@ impl phy::Device for StmPhy { type RxToken<'a> = StmPhyRxToken<'a> where Self: 'a; type TxToken<'a> = StmPhyTxToken<'a> where Self: 'a; - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { Some((StmPhyRxToken(&mut self.rx_buffer[..]), StmPhyTxToken(&mut self.tx_buffer[..]))) } - fn transmit(&mut self) -> Option> { + fn transmit(&mut self, _timestamp: Instant) -> Option> { Some(StmPhyTxToken(&mut self.tx_buffer[..])) } @@ -63,8 +63,8 @@ impl phy::Device for StmPhy { struct StmPhyRxToken<'a>(&'a mut [u8]); impl<'a> phy::RxToken for StmPhyRxToken<'a> { - fn consume(mut self, _timestamp: Instant, f: F) -> Result - where F: FnOnce(&mut [u8]) -> Result + fn consume(mut self, f: F) -> R + where F: FnOnce(&mut [u8]) -> R { // TODO: receive packet into buffer let result = f(&mut self.0); @@ -76,8 +76,8 @@ impl<'a> phy::RxToken for StmPhyRxToken<'a> { struct StmPhyTxToken<'a>(&'a mut [u8]); impl<'a> phy::TxToken for StmPhyTxToken<'a> { - fn consume(self, _timestamp: Instant, len: usize, f: F) -> Result - where F: FnOnce(&mut [u8]) -> Result + fn consume(self, len: usize, f: F) -> R + where F: FnOnce(&mut [u8]) -> R { let result = f(&mut self.0[..len]); println!("tx called {}", len); @@ -90,7 +90,6 @@ impl<'a> phy::TxToken for StmPhyTxToken<'a> { )] use crate::time::Instant; -use crate::Result; #[cfg(all( any(feature = "phy-raw_socket", feature = "phy-tuntap_interface"), @@ -322,10 +321,16 @@ pub trait Device { /// on the contents of the received packet. For example, this makes it possible to /// handle arbitrarily large ICMP echo ("ping") requests, where the all received bytes /// need to be sent back, without heap allocation. - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)>; + /// + /// The timestamp must be a number of milliseconds, monotonically increasing since an + /// arbitrary moment in time, such as system startup. + fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)>; /// Construct a transmit token. - fn transmit(&mut self) -> Option>; + /// + /// The timestamp must be a number of milliseconds, monotonically increasing since an + /// arbitrary moment in time, such as system startup. + fn transmit(&mut self, timestamp: Instant) -> Option>; /// Get a description of device capabilities. fn capabilities(&self) -> DeviceCapabilities; @@ -337,12 +342,9 @@ pub trait RxToken { /// /// This method receives a packet and then calls the given closure `f` with the raw /// packet bytes as argument. - /// - /// The timestamp must be a number of milliseconds, monotonically increasing since an - /// arbitrary moment in time, such as system startup. - fn consume(self, timestamp: Instant, f: F) -> Result + fn consume(self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result; + F: FnOnce(&mut [u8]) -> R; } /// A token to transmit a single network packet. @@ -353,10 +355,7 @@ pub trait TxToken { /// closure `f` with a mutable reference to that buffer. The closure should construct /// a valid network packet (e.g. an ethernet packet) in the buffer. When the closure /// returns, the transmit buffer is sent out. - /// - /// The timestamp must be a number of milliseconds, monotonically increasing since an - /// arbitrary moment in time, such as system startup. - fn consume(self, timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result; + F: FnOnce(&mut [u8]) -> R; } diff --git a/src/phy/pcap_writer.rs b/src/phy/pcap_writer.rs index edaf7844..662ec41b 100644 --- a/src/phy/pcap_writer.rs +++ b/src/phy/pcap_writer.rs @@ -6,7 +6,6 @@ use std::io::Write; use crate::phy::{self, Device, DeviceCapabilities}; use crate::time::Instant; -use crate::Result; enum_with_unknown! { /// Captured packet header type. @@ -177,30 +176,37 @@ where self.lower.capabilities() } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { - let sink = &self.sink; - let mode = self.mode; - self.lower.receive().map(move |(rx_token, tx_token)| { - let rx = RxToken { - token: rx_token, - sink, - mode, - }; - let tx = TxToken { - token: tx_token, - sink, - mode, - }; - (rx, tx) - }) - } - - fn transmit(&mut self) -> Option> { + fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { let sink = &self.sink; let mode = self.mode; self.lower - .transmit() - .map(move |token| TxToken { token, sink, mode }) + .receive(timestamp) + .map(move |(rx_token, tx_token)| { + let rx = RxToken { + token: rx_token, + sink, + mode, + timestamp, + }; + let tx = TxToken { + token: tx_token, + sink, + mode, + timestamp, + }; + (rx, tx) + }) + } + + fn transmit(&mut self, timestamp: Instant) -> Option> { + let sink = &self.sink; + let mode = self.mode; + self.lower.transmit(timestamp).map(move |token| TxToken { + token, + sink, + mode, + timestamp, + }) } } @@ -209,16 +215,17 @@ pub struct RxToken<'a, Rx: phy::RxToken, S: PcapSink> { token: Rx, sink: &'a RefCell, mode: PcapMode, + timestamp: Instant, } impl<'a, Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<'a, Rx, S> { - fn consume Result>(self, timestamp: Instant, f: F) -> Result { - let Self { token, sink, mode } = self; - token.consume(timestamp, |buffer| { - match mode { - PcapMode::Both | PcapMode::RxOnly => { - sink.borrow_mut().packet(timestamp, buffer.as_ref()) - } + fn consume R>(self, f: F) -> R { + self.token.consume(|buffer| { + match self.mode { + PcapMode::Both | PcapMode::RxOnly => self + .sink + .borrow_mut() + .packet(self.timestamp, buffer.as_ref()), PcapMode::TxOnly => (), } f(buffer) @@ -231,18 +238,20 @@ pub struct TxToken<'a, Tx: phy::TxToken, S: PcapSink> { token: Tx, sink: &'a RefCell, mode: PcapMode, + timestamp: Instant, } impl<'a, Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<'a, Tx, S> { - fn consume(self, timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { - let Self { token, sink, mode } = self; - token.consume(timestamp, len, |buffer| { + self.token.consume(len, |buffer| { let result = f(buffer); - match mode { - PcapMode::Both | PcapMode::TxOnly => sink.borrow_mut().packet(timestamp, buffer), + match self.mode { + PcapMode::Both | PcapMode::TxOnly => { + self.sink.borrow_mut().packet(self.timestamp, buffer) + } PcapMode::RxOnly => (), }; result diff --git a/src/phy/raw_socket.rs b/src/phy/raw_socket.rs index 0cc45200..beadfdac 100644 --- a/src/phy/raw_socket.rs +++ b/src/phy/raw_socket.rs @@ -6,7 +6,6 @@ use std::vec::Vec; use crate::phy::{self, sys, Device, DeviceCapabilities, Medium}; use crate::time::Instant; -use crate::Result; /// A socket that captures or transmits the complete frame. #[derive(Debug)] @@ -70,7 +69,7 @@ impl Device for RawSocket { } } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { let mut lower = self.lower.borrow_mut(); let mut buffer = vec![0; self.mtu]; match lower.recv(&mut buffer[..]) { @@ -87,7 +86,7 @@ impl Device for RawSocket { } } - fn transmit(&mut self) -> Option> { + fn transmit(&mut self, _timestamp: Instant) -> Option> { Some(TxToken { lower: self.lower.clone(), }) @@ -100,9 +99,9 @@ pub struct RxToken { } impl phy::RxToken for RxToken { - fn consume(mut self, _timestamp: Instant, f: F) -> Result + fn consume(mut self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { f(&mut self.buffer[..]) } @@ -114,17 +113,20 @@ pub struct TxToken { } impl phy::TxToken for TxToken { - fn consume(self, _timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { let mut lower = self.lower.borrow_mut(); let mut buffer = vec![0; len]; let result = f(&mut buffer); match lower.send(&buffer[..]) { - Ok(_) => result, - Err(err) if err.kind() == io::ErrorKind::WouldBlock => Err(crate::Error::Exhausted), + Ok(_) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + net_debug!("phy: tx failed due to WouldBlock") + } Err(err) => panic!("{}", err), } + result } } diff --git a/src/phy/tracer.rs b/src/phy/tracer.rs index 934c6f0f..5e9b7df0 100644 --- a/src/phy/tracer.rs +++ b/src/phy/tracer.rs @@ -2,10 +2,7 @@ use core::fmt; use crate::phy::{self, Device, DeviceCapabilities, Medium}; use crate::time::Instant; -use crate::{ - wire::pretty_print::{PrettyIndent, PrettyPrint}, - Result, -}; +use crate::wire::pretty_print::{PrettyIndent, PrettyPrint}; /// A tracer device. /// @@ -56,38 +53,41 @@ impl Device for Tracer { self.inner.capabilities() } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { let &mut Self { ref mut inner, writer, .. } = self; let medium = inner.capabilities().medium; - inner.receive().map(|(rx_token, tx_token)| { + inner.receive(timestamp).map(|(rx_token, tx_token)| { let rx = RxToken { token: rx_token, writer, medium, + timestamp, }; let tx = TxToken { token: tx_token, writer, medium, + timestamp, }; (rx, tx) }) } - fn transmit(&mut self) -> Option> { + fn transmit(&mut self, timestamp: Instant) -> Option> { let &mut Self { ref mut inner, writer, } = self; let medium = inner.capabilities().medium; - inner.transmit().map(|tx_token| TxToken { + inner.transmit(timestamp).map(|tx_token| TxToken { token: tx_token, medium, writer, + timestamp, }) } } @@ -97,24 +97,20 @@ pub struct RxToken { token: Rx, writer: fn(Instant, Packet), medium: Medium, + timestamp: Instant, } impl phy::RxToken for RxToken { - fn consume(self, timestamp: Instant, f: F) -> Result + fn consume(self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { - let Self { - token, - writer, - medium, - } = self; - token.consume(timestamp, |buffer| { - writer( - timestamp, + self.token.consume(|buffer| { + (self.writer)( + self.timestamp, Packet { buffer, - medium, + medium: self.medium, prefix: "<- ", }, ); @@ -128,25 +124,21 @@ pub struct TxToken { token: Tx, writer: fn(Instant, Packet), medium: Medium, + timestamp: Instant, } impl phy::TxToken for TxToken { - fn consume(self, timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { - let Self { - token, - writer, - medium, - } = self; - token.consume(timestamp, len, |buffer| { + self.token.consume(len, |buffer| { let result = f(buffer); - writer( - timestamp, + (self.writer)( + self.timestamp, Packet { buffer, - medium, + medium: self.medium, prefix: "-> ", }, ); diff --git a/src/phy/tuntap_interface.rs b/src/phy/tuntap_interface.rs index 7bc2aa0d..7f7ae3c0 100644 --- a/src/phy/tuntap_interface.rs +++ b/src/phy/tuntap_interface.rs @@ -6,7 +6,6 @@ use std::vec::Vec; use crate::phy::{self, sys, Device, DeviceCapabilities, Medium}; use crate::time::Instant; -use crate::Result; /// A virtual TUN (IP) or TAP (Ethernet) interface. #[derive(Debug)] @@ -52,7 +51,7 @@ impl Device for TunTapInterface { } } - fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { let mut lower = self.lower.borrow_mut(); let mut buffer = vec![0; self.mtu]; match lower.recv(&mut buffer[..]) { @@ -69,7 +68,7 @@ impl Device for TunTapInterface { } } - fn transmit(&mut self) -> Option> { + fn transmit(&mut self, _timestamp: Instant) -> Option> { Some(TxToken { lower: self.lower.clone(), }) @@ -82,9 +81,9 @@ pub struct RxToken { } impl phy::RxToken for RxToken { - fn consume(mut self, _timestamp: Instant, f: F) -> Result + fn consume(mut self, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { f(&mut self.buffer[..]) } @@ -96,14 +95,20 @@ pub struct TxToken { } impl phy::TxToken for TxToken { - fn consume(self, _timestamp: Instant, len: usize, f: F) -> Result + fn consume(self, len: usize, f: F) -> R where - F: FnOnce(&mut [u8]) -> Result, + F: FnOnce(&mut [u8]) -> R, { let mut lower = self.lower.borrow_mut(); let mut buffer = vec![0; len]; let result = f(&mut buffer); - lower.send(&buffer[..]).unwrap(); + match lower.send(&buffer[..]) { + Ok(_) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + net_debug!("phy: tx failed due to WouldBlock") + } + Err(err) => panic!("{}", err), + } result } }