diff --git a/Cargo.toml b/Cargo.toml index 058eef42..ae74d4f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,6 +115,9 @@ required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interfac [[example]] name = "sixlowpan" required-features = ["std", "medium-ieee802154", "phy-raw_socket", "proto-sixlowpan", "socket-udp"] +[[example]] +name = "sixlowpan_benchmark" +required-features = ["std", "medium-ieee802154", "phy-raw_socket", "proto-sixlowpan", "socket-udp"] [[example]] name = "dns" diff --git a/examples/benchmark.rs b/examples/benchmark.rs index 720894b2..e4d3e27a 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -81,8 +81,6 @@ fn main() { _ => panic!("invalid mode"), }; - thread::spawn(move || client(mode)); - let neighbor_cache = NeighborCache::new(BTreeMap::new()); let tcp1_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); @@ -108,6 +106,7 @@ fn main() { let tcp2_handle = iface.add_socket(tcp2_socket); let default_timeout = Some(Duration::from_millis(1000)); + thread::spawn(move || client(mode)); let mut processed = 0; while !CLIENT_DONE.load(Ordering::SeqCst) { let timestamp = Instant::now(); diff --git a/examples/sixlowpan.rs b/examples/sixlowpan.rs index b31d54e0..d7d975a1 100644 --- a/examples/sixlowpan.rs +++ b/examples/sixlowpan.rs @@ -49,11 +49,11 @@ use std::str; use smoltcp::iface::{FragmentsCache, InterfaceBuilder, NeighborCache}; use smoltcp::phy::{wait as phy_wait, Medium, RawSocket}; +use smoltcp::socket::tcp; use smoltcp::socket::udp; +use smoltcp::storage::RingBuffer; use smoltcp::time::Instant; use smoltcp::wire::{Ieee802154Pan, IpAddress, IpCidr}; -use smoltcp::socket::tcp; -use smoltcp::storage::RingBuffer; fn main() { utils::setup_logging(""); diff --git a/examples/sixlowpan_benchmark.rs b/examples/sixlowpan_benchmark.rs new file mode 100644 index 00000000..6de9df62 --- /dev/null +++ b/examples/sixlowpan_benchmark.rs @@ -0,0 +1,247 @@ +//! 6lowpan benchmark exmaple +//! +//! This example runs a simple TCP throughput benchmark using the 6lowpan implementation in smoltcp +//! It is designed to run using the Linux ieee802154/6lowpan support, +//! using mac802154_hwsim. +//! +//! mac802154_hwsim allows you to create multiple "virtual" radios and specify +//! which is in range with which. This is very useful for testing without +//! needing real hardware. By default it creates two interfaces `wpan0` and +//! `wpan1` that are in range with each other. You can customize this with +//! the `wpan-hwsim` tool. +//! +//! We'll configure Linux to speak 6lowpan on `wpan0`, and leave `wpan1` +//! unconfigured so smoltcp can use it with a raw socket. +//! +//! +//! +//! +//! +//! # Setup +//! +//! modprobe mac802154_hwsim +//! +//! ip link set wpan0 down +//! ip link set wpan1 down +//! iwpan dev wpan0 set pan_id 0xbeef +//! iwpan dev wpan1 set pan_id 0xbeef +//! ip link add link wpan0 name lowpan0 type lowpan +//! ip link set wpan0 up +//! ip link set wpan1 up +//! ip link set lowpan0 up +//! +//! +//! # Running +//! +//! Compile with `cargo build --release --example sixlowpan_benchmark` +//! Run it with `sudo ./target/release/examples/sixlowpan_benchmark [reader|writer]`. +//! +//! # Teardown +//! +//! rmmod mac802154_hwsim +//! + +mod utils; + +use log::debug; +use std::collections::BTreeMap; +use std::os::unix::io::AsRawFd; +use std::str; + +use smoltcp::iface::{FragmentsCache, InterfaceBuilder, NeighborCache}; +use smoltcp::phy::{wait as phy_wait, Medium, RawSocket}; +use smoltcp::socket::tcp; +use smoltcp::storage::RingBuffer; +use smoltcp::wire::{Ieee802154Pan, IpAddress, IpCidr}; + +//For benchmark +use smoltcp::time::{Duration, Instant}; +use std::cmp; +use std::io::{Read, Write}; +use std::net::SocketAddrV6; +use std::net::TcpStream; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; + +use std::fs; + +fn if_nametoindex(ifname: &str) -> u32 { + let contents = fs::read_to_string(format!("/sys/devices/virtual/net/{}/ifindex", ifname)) + .expect("couldn't read interface from \"/sys/devices/virtual/net\"") + .replace("\n", ""); + contents.parse::().unwrap() +} + +const AMOUNT: usize = 100_000_000; + +enum Client { + Reader, + Writer, +} + +fn client(kind: Client) { + let port: u16 = match kind { + Client::Reader => 1234, + Client::Writer => 1235, + }; + + let scope_id = if_nametoindex("lowpan0"); + + let socket_addr = SocketAddrV6::new( + "fe80:0:0:0:180b:4242:4242:4242".parse().unwrap(), + port, + 0, + scope_id, + ); + + let mut stream = TcpStream::connect(socket_addr).expect("failed to connect TLKAGMKA"); + let mut buffer = vec![0; 1_000_000]; + + let start = Instant::now(); + + let mut processed = 0; + while processed < AMOUNT { + let length = cmp::min(buffer.len(), AMOUNT - processed); + let result = match kind { + Client::Reader => stream.read(&mut buffer[..length]), + Client::Writer => stream.write(&buffer[..length]), + }; + match result { + Ok(0) => break, + Ok(result) => { + // print!("(P:{})", result); + processed += result + } + Err(err) => panic!("cannot process: {}", err), + } + } + + let end = Instant::now(); + + let elapsed = (end - start).total_millis() as f64 / 1000.0; + + println!("throughput: {:.3} Gbps", AMOUNT as f64 / elapsed / 0.125e9); + + CLIENT_DONE.store(true, Ordering::SeqCst); +} + +static CLIENT_DONE: AtomicBool = AtomicBool::new(false); + +fn main() { + #[cfg(feature = "log")] + utils::setup_logging("info"); + + let (mut opts, mut free) = utils::create_options(); + utils::add_middleware_options(&mut opts, &mut free); + free.push("MODE"); + + let mut matches = utils::parse_options(&opts, free); + + let device = RawSocket::new("wpan1", Medium::Ieee802154).unwrap(); + + let fd = device.as_raw_fd(); + let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/ false); + + let mode = match matches.free[0].as_ref() { + "reader" => Client::Reader, + "writer" => Client::Writer, + _ => panic!("invalid mode"), + }; + + let neighbor_cache = NeighborCache::new(BTreeMap::new()); + + let tcp1_rx_buffer = tcp::SocketBuffer::new(vec![0; 4096]); + let tcp1_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]); + let tcp1_socket = tcp::Socket::new(tcp1_rx_buffer, tcp1_tx_buffer); + + let tcp2_rx_buffer = tcp::SocketBuffer::new(vec![0; 4096]); + let tcp2_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]); + let tcp2_socket = tcp::Socket::new(tcp2_rx_buffer, tcp2_tx_buffer); + + let ieee802154_addr = smoltcp::wire::Ieee802154Address::Extended([ + 0x1a, 0x0b, 0x42, 0x42, 0x42, 0x42, 0x42, 0x42, + ]); + let ip_addrs = [IpCidr::new( + IpAddress::v6(0xfe80, 0, 0, 0, 0x180b, 0x4242, 0x4242, 0x4242), + 64, + )]; + + let cache = FragmentsCache::new(vec![], BTreeMap::new()); + + let buffer: Vec<(usize, managed::ManagedSlice<'_, u8>)> = (0..12) + .into_iter() + .map(|_| (0_usize, managed::ManagedSlice::from(vec![0; 1_000_000_000]))) + .collect(); + + let out_fragments_cache = RingBuffer::new(buffer); + + let mut builder = InterfaceBuilder::new(device, vec![]) + .ip_addrs(ip_addrs) + .pan_id(Ieee802154Pan(0xbeef)); + builder = builder + .hardware_addr(ieee802154_addr.into()) + .neighbor_cache(neighbor_cache) + .sixlowpan_fragments_cache(cache) + .out_fragments_cache(out_fragments_cache); + let mut iface = builder.finalize(); + + let tcp1_handle = iface.add_socket(tcp1_socket); + let tcp2_handle = iface.add_socket(tcp2_socket); + + let default_timeout = Some(Duration::from_millis(1000)); + + thread::spawn(move || client(mode)); + let mut processed = 0; + + while !CLIENT_DONE.load(Ordering::SeqCst) { + let timestamp = Instant::now(); + match iface.poll(timestamp) { + Ok(_) => {} + Err(e) => { + debug!("poll error: {}", e); + } + } + + // tcp:1234: emit data + let socket = iface.get_socket::(tcp1_handle); + if !socket.is_open() { + socket.listen(1234).unwrap(); + } + + if socket.can_send() && processed < AMOUNT { + let length = socket + .send(|buffer| { + let length = cmp::min(buffer.len(), AMOUNT - processed); + (length, length) + }) + .unwrap(); + processed += length; + } + + // tcp:1235: sink data + let socket = iface.get_socket::(tcp2_handle); + if !socket.is_open() { + socket.listen(1235).unwrap(); + } + + if socket.can_recv() && processed < AMOUNT { + let length = socket + .recv(|buffer| { + let length = cmp::min(buffer.len(), AMOUNT - processed); + (length, length) + }) + .unwrap(); + processed += length; + } + + match iface.poll_at(timestamp) { + Some(poll_at) if timestamp < poll_at => { + phy_wait(fd, Some(poll_at - timestamp)).expect("wait error"); + } + Some(_) => (), + None => { + phy_wait(fd, default_timeout).expect("wait error"); + } + } + } +} diff --git a/src/iface/interface.rs b/src/iface/interface.rs index 8a0fc7ce..a1edf673 100644 --- a/src/iface/interface.rs +++ b/src/iface/interface.rs @@ -979,17 +979,24 @@ where let mut neighbor_addr = None; let mut respond = |inner: &mut InterfaceInner, response: IpPacket| { neighbor_addr = Some(response.ip_repr().dst_addr()); - // FIXME(thvdveld): remove unwrap - let tx_token = device.transmit().ok_or(Error::Exhausted).unwrap(); - #[cfg(feature = "proto-sixlowpan")] - { - inner.dispatch_ip(tx_token, response, Some(out_fragments)).unwrap(); + match device.transmit().ok_or(Error::Exhausted) { + Ok(t) => { + #[cfg(feature = "proto-sixlowpan")] + if let Err(_e) = inner.dispatch_ip(t, response, Some(out_fragments)) { + net_debug!("failed to dispatch IP: {}", _e); + } + + #[cfg(not(feature = "proto-sixlowpan"))] + if let Err(_e) = inner.dispatch_ip(t, response, None) { + net_debug!("failed to dispatch IP: {}", _e); + } + emitted_any = true; + } + Err(e) => { + net_debug!("failed to transmit IP: {}", e); + } } - #[cfg(not(feature = "proto-sixlowpan"))] - { - check!(inner.dispatch_ip(tx_token, response, None)); - } - emitted_any = true; + Ok(()) }; @@ -2734,6 +2741,7 @@ impl<'a> InterfaceInner<'a> { let (src_addr, dst_addr) = match (ip_repr.src_addr(), ip_repr.dst_addr()) { (IpAddress::Ipv6(src_addr), IpAddress::Ipv6(dst_addr)) => (src_addr, dst_addr), + #[allow(unreachable_patterns)] _ => return Err(Error::Unaddressable), }; @@ -2905,32 +2913,36 @@ impl<'a> InterfaceInner<'a> { // We need to save the the rest of the packets into the `out_fragments` buffer. while written < total_size { - out_fragments.enqueue_one_with::<'_, (), Error, _>(|(len, tx_buf)| { - let mut tx_buf = &mut tx_buf[..]; + out_fragments + .enqueue_one_with::<'_, (), Error, _>(|(len, tx_buf)| { + let mut tx_buf = &mut tx_buf[..]; - // Modify the sequence number of the IEEE header - ieee_repr.sequence_number = Some(self.get_sequence_number()); - let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]); - ieee_repr.emit(&mut ieee_packet); - tx_buf = &mut tx_buf[ieee_len..]; + // Modify the sequence number of the IEEE header + ieee_repr.sequence_number = Some(self.get_sequence_number()); + let mut ieee_packet = + Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]); + ieee_repr.emit(&mut ieee_packet); + tx_buf = &mut tx_buf[ieee_len..]; - // Add the next fragment header - let datagram_offset = ((40 + written - ieee_len) / 8) as u8; - fragn.set_offset(datagram_offset); - let mut frag_packet = - SixlowpanFragPacket::new_unchecked(&mut tx_buf[..fragn.buffer_len()]); - fragn.emit(&mut frag_packet); - tx_buf = &mut tx_buf[fragn.buffer_len()..]; + // Add the next fragment header + let datagram_offset = ((40 + written - ieee_len) / 8) as u8; + fragn.set_offset(datagram_offset); + let mut frag_packet = + SixlowpanFragPacket::new_unchecked(&mut tx_buf[..fragn.buffer_len()]); + fragn.emit(&mut frag_packet); + tx_buf = &mut tx_buf[fragn.buffer_len()..]; - // Add the buffer part - let frag_size = (total_size - written).min(fragn_size); - tx_buf[..frag_size].copy_from_slice(&buffer[written..][..frag_size]); - written += frag_size; + // Add the buffer part + let frag_size = (total_size - written).min(fragn_size); + tx_buf[..frag_size].copy_from_slice(&buffer[written..][..frag_size]); + written += frag_size; - // Save the lenght of this packet. - *len = ieee_len + fragn.buffer_len() + frag_size; - Ok(()) - }).unwrap().unwrap(); + // Save the lenght of this packet. + *len = ieee_len + fragn.buffer_len() + frag_size; + Ok(()) + }) + .unwrap() + .unwrap(); } Ok(()) diff --git a/src/wire/sixlowpan.rs b/src/wire/sixlowpan.rs index 8dc8a235..69caadd8 100644 --- a/src/wire/sixlowpan.rs +++ b/src/wire/sixlowpan.rs @@ -325,7 +325,7 @@ pub mod frag { } } - impl<'a, T: AsRef<[u8]> + AsMut<[u8]>> Packet { + impl + AsMut<[u8]>> Packet { fn set_dispatch_field(&mut self, value: u8) { let raw = self.buffer.as_mut(); raw[field::DISPATCH] = (raw[field::DISPATCH] & !(0b11111 << 3)) | (value << 3); @@ -839,7 +839,7 @@ pub mod iphc { } } - impl<'a, T: AsRef<[u8]> + AsMut<[u8]>> Packet { + impl + AsMut<[u8]>> Packet { /// Set the dispatch field to `0b011`. fn set_dispatch_field(&mut self) { let data = &mut self.buffer.as_mut()[field::IPHC_FIELD]; @@ -1506,7 +1506,7 @@ pub mod nhc { } } - impl<'a, T: AsRef<[u8]> + AsMut<[u8]>> ExtHeaderPacket { + impl + AsMut<[u8]>> ExtHeaderPacket { /// Return a mutable pointer to the payload. pub fn payload_mut(&mut self) -> &mut [u8] { let start = 2 + self.next_header_size(); @@ -1781,7 +1781,7 @@ pub mod nhc { } } - impl<'a, T: AsRef<[u8]> + AsMut<[u8]>> UdpNhcPacket { + impl + AsMut<[u8]>> UdpNhcPacket { /// Return a mutable pointer to the payload. pub fn payload_mut(&mut self) -> &mut [u8] { let start = 1 + self.ports_size() + 2; // XXX(thvdveld): we assume we put the checksum inlined.