From a237edb7c364fe5a470729c6972124469ca0268b Mon Sep 17 00:00:00 2001 From: zys864 <616561164@qq.com> Date: Fri, 8 Apr 2022 17:29:52 +0800 Subject: [PATCH] Fix `example-unix-domain-socket` on non-unix platforms (#919) * remove unused `axum`'s dependency:`tokio-util` * fix `examples/todos`'s `async fn todos_index` iter_overeager_cloned * Add docs to `/examples/async-graphql`, just like other xamples. * remove `examples/async-graphql` unused dependencies `tracing-subscriber` and `trace` * `examples/chat` deps `trace` and `tracing-subscriber` never be used. Add trace `debug` to `chat` * remove `examples/print-request-response` unused dependency `axum-extra` * remove `examples/prometheus-metrics` unused dependency `axum-extra` * remove `examples/reverse-proxy` unused dependencies `tracing-subscriber` and `trace` * `examples/chat` fmt fix * fix `example-unix-domain-socket` compile error on not-unix platforms Co-authored-by: zys864 Co-authored-by: zys864 --- examples/unix-domain-socket/src/main.rs | 270 ++++++++++++------------ 1 file changed, 140 insertions(+), 130 deletions(-) diff --git a/examples/unix-domain-socket/src/main.rs b/examples/unix-domain-socket/src/main.rs index 46096603..f23734e9 100644 --- a/examples/unix-domain-socket/src/main.rs +++ b/examples/unix-domain-socket/src/main.rs @@ -4,31 +4,11 @@ //! cargo run -p example-unix-domain-socket //! ``` -use axum::{ - body::Body, - extract::connect_info::{self, ConnectInfo}, - http::{Method, Request, StatusCode, Uri}, - routing::get, - Router, -}; -use futures::ready; -use hyper::{ - client::connect::{Connected, Connection}, - server::accept::Accept, -}; -use std::{ - io, - path::PathBuf, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::{unix::UCred, UnixListener, UnixStream}, -}; -use tower::BoxError; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +#[cfg(unix)] +#[tokio::main] +async fn main() { + unix::server().await; +} #[cfg(not(unix))] fn main() { @@ -36,135 +16,165 @@ fn main() { } #[cfg(unix)] -#[tokio::main] -async fn main() { - tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new( - std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), - )) - .with(tracing_subscriber::fmt::layer()) - .init(); +mod unix { + use axum::{ + body::Body, + extract::connect_info::{self, ConnectInfo}, + http::{Method, Request, StatusCode, Uri}, + routing::get, + Router, + }; + use futures::ready; + use hyper::{ + client::connect::{Connected, Connection}, + server::accept::Accept, + }; + use std::{ + io, + path::PathBuf, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + }; + use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::{unix::UCred, UnixListener, UnixStream}, + }; + use tower::BoxError; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - let path = PathBuf::from("/tmp/axum/helloworld"); + pub async fn server() { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); - let _ = tokio::fs::remove_file(&path).await; - tokio::fs::create_dir_all(path.parent().unwrap()) - .await - .unwrap(); + let path = PathBuf::from("/tmp/axum/helloworld"); - let uds = UnixListener::bind(path.clone()).unwrap(); - tokio::spawn(async { - let app = Router::new().route("/", get(handler)); - - axum::Server::builder(ServerAccept { uds }) - .serve(app.into_make_service_with_connect_info::()) + let _ = tokio::fs::remove_file(&path).await; + tokio::fs::create_dir_all(path.parent().unwrap()) .await .unwrap(); - }); - let connector = tower::service_fn(move |_: Uri| { - let path = path.clone(); - Box::pin(async move { - let stream = UnixStream::connect(path).await?; - Ok::<_, io::Error>(ClientConnection { stream }) - }) - }); - let client = hyper::Client::builder().build(connector); + let uds = UnixListener::bind(path.clone()).unwrap(); + tokio::spawn(async { + let app = Router::new().route("/", get(handler)); - let request = Request::builder() - .method(Method::GET) - .uri("http://uri-doesnt-matter.com") - .body(Body::empty()) - .unwrap(); + axum::Server::builder(ServerAccept { uds }) + .serve(app.into_make_service_with_connect_info::()) + .await + .unwrap(); + }); - let response = client.request(request).await.unwrap(); + let connector = tower::service_fn(move |_: Uri| { + let path = path.clone(); + Box::pin(async move { + let stream = UnixStream::connect(path).await?; + Ok::<_, io::Error>(ClientConnection { stream }) + }) + }); + let client = hyper::Client::builder().build(connector); - assert_eq!(response.status(), StatusCode::OK); + let request = Request::builder() + .method(Method::GET) + .uri("http://uri-doesnt-matter.com") + .body(Body::empty()) + .unwrap(); - let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); - let body = String::from_utf8(body.to_vec()).unwrap(); - assert_eq!(body, "Hello, World!"); -} + let response = client.request(request).await.unwrap(); -async fn handler(ConnectInfo(info): ConnectInfo) -> &'static str { - println!("new connection from `{:?}`", info); + assert_eq!(response.status(), StatusCode::OK); - "Hello, World!" -} - -struct ServerAccept { - uds: UnixListener, -} - -impl Accept for ServerAccept { - type Conn = UnixStream; - type Error = BoxError; - - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let (stream, _addr) = ready!(self.uds.poll_accept(cx))?; - Poll::Ready(Some(Ok(stream))) - } -} - -struct ClientConnection { - stream: UnixStream, -} - -impl AsyncWrite for ClientConnection { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.stream).poll_write(cx, buf) + let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); + let body = String::from_utf8(body.to_vec()).unwrap(); + assert_eq!(body, "Hello, World!"); } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_flush(cx) + async fn handler(ConnectInfo(info): ConnectInfo) -> &'static str { + println!("new connection from `{:?}`", info); + + "Hello, World!" } - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.stream).poll_shutdown(cx) + struct ServerAccept { + uds: UnixListener, } -} -impl AsyncRead for ClientConnection { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.stream).poll_read(cx, buf) + impl Accept for ServerAccept { + type Conn = UnixStream; + type Error = BoxError; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let (stream, _addr) = ready!(self.uds.poll_accept(cx))?; + Poll::Ready(Some(Ok(stream))) + } } -} -impl Connection for ClientConnection { - fn connected(&self) -> Connected { - Connected::new() + struct ClientConnection { + stream: UnixStream, } -} -#[derive(Clone, Debug)] -#[allow(dead_code)] -struct UdsConnectInfo { - peer_addr: Arc, - peer_cred: UCred, -} + impl AsyncWrite for ClientConnection { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.stream).poll_write(cx, buf) + } -impl connect_info::Connected<&UnixStream> for UdsConnectInfo { - fn connect_info(target: &UnixStream) -> Self { - let peer_addr = target.peer_addr().unwrap(); - let peer_cred = target.peer_cred().unwrap(); + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_flush(cx) + } - Self { - peer_addr: Arc::new(peer_addr), - peer_cred, + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_shutdown(cx) + } + } + + impl AsyncRead for ClientConnection { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_read(cx, buf) + } + } + + impl Connection for ClientConnection { + fn connected(&self) -> Connected { + Connected::new() + } + } + + #[derive(Clone, Debug)] + #[allow(dead_code)] + struct UdsConnectInfo { + peer_addr: Arc, + peer_cred: UCred, + } + + impl connect_info::Connected<&UnixStream> for UdsConnectInfo { + fn connect_info(target: &UnixStream) -> Self { + let peer_addr = target.peer_addr().unwrap(); + let peer_cred = target.peer_cred().unwrap(); + + Self { + peer_addr: Arc::new(peer_addr), + peer_cred, + } } } }