Fix example-unix-domain-socket on non-unix platforms (#919)

* remove unused `axum`'s dependency:`tokio-util`

* fix `examples/todos`'s `async fn todos_index` iter_overeager_cloned

* Add docs to `/examples/async-graphql`, just like other xamples.

* remove `examples/async-graphql` unused dependencies `tracing-subscriber` and `trace`

* `examples/chat` deps `trace` and `tracing-subscriber` never be used. Add trace `debug` to `chat`

* remove `examples/print-request-response` unused dependency `axum-extra`

* remove `examples/prometheus-metrics` unused dependency `axum-extra`

* remove `examples/reverse-proxy` unused dependencies `tracing-subscriber` and `trace`

* `examples/chat` fmt fix

* fix `example-unix-domain-socket` compile error on not-unix platforms

Co-authored-by: zys864 <zys864@qq.com>
Co-authored-by: zys864 <zys864@gmail.com>
This commit is contained in:
zys864 2022-04-08 17:29:52 +08:00 committed by GitHub
parent 3747650ae9
commit a237edb7c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,31 +4,11 @@
//! cargo run -p example-unix-domain-socket //! cargo run -p example-unix-domain-socket
//! ``` //! ```
use axum::{ #[cfg(unix)]
body::Body, #[tokio::main]
extract::connect_info::{self, ConnectInfo}, async fn main() {
http::{Method, Request, StatusCode, Uri}, unix::server().await;
routing::get, }
Router,
};
use futures::ready;
use hyper::{
client::connect::{Connected, Connection},
server::accept::Accept,
};
use std::{
io,
path::PathBuf,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{unix::UCred, UnixListener, UnixStream},
};
use tower::BoxError;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[cfg(not(unix))] #[cfg(not(unix))]
fn main() { fn main() {
@ -36,135 +16,165 @@ fn main() {
} }
#[cfg(unix)] #[cfg(unix)]
#[tokio::main] mod unix {
async fn main() { use axum::{
tracing_subscriber::registry() body::Body,
.with(tracing_subscriber::EnvFilter::new( extract::connect_info::{self, ConnectInfo},
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), http::{Method, Request, StatusCode, Uri},
)) routing::get,
.with(tracing_subscriber::fmt::layer()) Router,
.init(); };
use futures::ready;
use hyper::{
client::connect::{Connected, Connection},
server::accept::Accept,
};
use std::{
io,
path::PathBuf,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{unix::UCred, UnixListener, UnixStream},
};
use tower::BoxError;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
let path = PathBuf::from("/tmp/axum/helloworld"); pub async fn server() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let _ = tokio::fs::remove_file(&path).await; let path = PathBuf::from("/tmp/axum/helloworld");
tokio::fs::create_dir_all(path.parent().unwrap())
.await
.unwrap();
let uds = UnixListener::bind(path.clone()).unwrap(); let _ = tokio::fs::remove_file(&path).await;
tokio::spawn(async { tokio::fs::create_dir_all(path.parent().unwrap())
let app = Router::new().route("/", get(handler));
axum::Server::builder(ServerAccept { uds })
.serve(app.into_make_service_with_connect_info::<UdsConnectInfo>())
.await .await
.unwrap(); .unwrap();
});
let connector = tower::service_fn(move |_: Uri| { let uds = UnixListener::bind(path.clone()).unwrap();
let path = path.clone(); tokio::spawn(async {
Box::pin(async move { let app = Router::new().route("/", get(handler));
let stream = UnixStream::connect(path).await?;
Ok::<_, io::Error>(ClientConnection { stream })
})
});
let client = hyper::Client::builder().build(connector);
let request = Request::builder() axum::Server::builder(ServerAccept { uds })
.method(Method::GET) .serve(app.into_make_service_with_connect_info::<UdsConnectInfo>())
.uri("http://uri-doesnt-matter.com") .await
.body(Body::empty()) .unwrap();
.unwrap(); });
let response = client.request(request).await.unwrap(); let connector = tower::service_fn(move |_: Uri| {
let path = path.clone();
Box::pin(async move {
let stream = UnixStream::connect(path).await?;
Ok::<_, io::Error>(ClientConnection { stream })
})
});
let client = hyper::Client::builder().build(connector);
assert_eq!(response.status(), StatusCode::OK); let request = Request::builder()
.method(Method::GET)
.uri("http://uri-doesnt-matter.com")
.body(Body::empty())
.unwrap();
let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); let response = client.request(request).await.unwrap();
let body = String::from_utf8(body.to_vec()).unwrap();
assert_eq!(body, "Hello, World!");
}
async fn handler(ConnectInfo(info): ConnectInfo<UdsConnectInfo>) -> &'static str { assert_eq!(response.status(), StatusCode::OK);
println!("new connection from `{:?}`", info);
"Hello, World!" let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
} let body = String::from_utf8(body.to_vec()).unwrap();
assert_eq!(body, "Hello, World!");
struct ServerAccept {
uds: UnixListener,
}
impl Accept for ServerAccept {
type Conn = UnixStream;
type Error = BoxError;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let (stream, _addr) = ready!(self.uds.poll_accept(cx))?;
Poll::Ready(Some(Ok(stream)))
}
}
struct ClientConnection {
stream: UnixStream,
}
impl AsyncWrite for ClientConnection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { async fn handler(ConnectInfo(info): ConnectInfo<UdsConnectInfo>) -> &'static str {
Pin::new(&mut self.stream).poll_flush(cx) println!("new connection from `{:?}`", info);
"Hello, World!"
} }
fn poll_shutdown( struct ServerAccept {
mut self: Pin<&mut Self>, uds: UnixListener,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
} }
}
impl AsyncRead for ClientConnection { impl Accept for ServerAccept {
fn poll_read( type Conn = UnixStream;
mut self: Pin<&mut Self>, type Error = BoxError;
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, fn poll_accept(
) -> Poll<io::Result<()>> { self: Pin<&mut Self>,
Pin::new(&mut self.stream).poll_read(cx, buf) cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let (stream, _addr) = ready!(self.uds.poll_accept(cx))?;
Poll::Ready(Some(Ok(stream)))
}
} }
}
impl Connection for ClientConnection { struct ClientConnection {
fn connected(&self) -> Connected { stream: UnixStream,
Connected::new()
} }
}
#[derive(Clone, Debug)] impl AsyncWrite for ClientConnection {
#[allow(dead_code)] fn poll_write(
struct UdsConnectInfo { mut self: Pin<&mut Self>,
peer_addr: Arc<tokio::net::unix::SocketAddr>, cx: &mut Context<'_>,
peer_cred: UCred, buf: &[u8],
} ) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}
impl connect_info::Connected<&UnixStream> for UdsConnectInfo { fn poll_flush(
fn connect_info(target: &UnixStream) -> Self { mut self: Pin<&mut Self>,
let peer_addr = target.peer_addr().unwrap(); cx: &mut Context<'_>,
let peer_cred = target.peer_cred().unwrap(); ) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
Self { fn poll_shutdown(
peer_addr: Arc::new(peer_addr), mut self: Pin<&mut Self>,
peer_cred, cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}
impl AsyncRead for ClientConnection {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}
impl Connection for ClientConnection {
fn connected(&self) -> Connected {
Connected::new()
}
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
struct UdsConnectInfo {
peer_addr: Arc<tokio::net::unix::SocketAddr>,
peer_cred: UCred,
}
impl connect_info::Connected<&UnixStream> for UdsConnectInfo {
fn connect_info(target: &UnixStream) -> Self {
let peer_addr = target.peer_addr().unwrap();
let peer_cred = target.peer_cred().unwrap();
Self {
peer_addr: Arc::new(peer_addr),
peer_cred,
}
} }
} }
} }