mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
208 lines
7.9 KiB
Rust
208 lines
7.9 KiB
Rust
//! A "tiny database" and accompanying protocol
|
|
//!
|
|
//! This example shows the usage of shared state amongst all connected clients,
|
|
//! namely a database of key/value pairs. Each connected client can send a
|
|
//! series of GET/SET commands to query the current value of a key or set the
|
|
//! value of a key.
|
|
//!
|
|
//! This example has a simple protocol you can use to interact with the server.
|
|
//! To run, first run this in one terminal window:
|
|
//!
|
|
//! cargo run --example tinydb
|
|
//!
|
|
//! and next in another windows run:
|
|
//!
|
|
//! cargo run --example connect 127.0.0.1:8080
|
|
//!
|
|
//! In the `connect` window you can type in commands where when you hit enter
|
|
//! you'll get a response from the server for that command. An example session
|
|
//! is:
|
|
//!
|
|
//!
|
|
//! $ cargo run --example connect 127.0.0.1:8080
|
|
//! GET foo
|
|
//! foo = bar
|
|
//! GET FOOBAR
|
|
//! error: no key FOOBAR
|
|
//! SET FOOBAR my awesome string
|
|
//! set FOOBAR = `my awesome string`, previous: None
|
|
//! SET foo tokio
|
|
//! set foo = `tokio`, previous: Some("bar")
|
|
//! GET foo
|
|
//! foo = tokio
|
|
//!
|
|
//! Namely you can issue two forms of commands:
|
|
//!
|
|
//! * `GET $key` - this will fetch the value of `$key` from the database and
|
|
//! return it. The server's database is initially populated with the key `foo`
|
|
//! set to the value `bar`
|
|
//! * `SET $key $value` - this will set the value of `$key` to `$value`,
|
|
//! returning the previous value, if any.
|
|
|
|
#![deny(warnings)]
|
|
|
|
extern crate tokio;
|
|
|
|
use std::collections::HashMap;
|
|
use std::io::BufReader;
|
|
use std::env;
|
|
use std::net::SocketAddr;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use tokio::io::{lines, write_all};
|
|
use tokio::net::TcpListener;
|
|
use tokio::prelude::*;
|
|
|
|
/// The in-memory database shared amongst all clients.
|
|
///
|
|
/// This database will be shared via `Arc`, so to mutate the internal map we're
|
|
/// going to use a `Mutex` for interior mutability.
|
|
struct Database {
|
|
map: Mutex<HashMap<String, String>>,
|
|
}
|
|
|
|
/// Possible requests our clients can send us
|
|
enum Request {
|
|
Get { key: String },
|
|
Set { key: String, value: String },
|
|
}
|
|
|
|
/// Responses to the `Request` commands above
|
|
enum Response {
|
|
Value { key: String, value: String },
|
|
Set { key: String, value: String, previous: Option<String> },
|
|
Error { msg: String },
|
|
}
|
|
|
|
fn main() -> Result<(), Box<std::error::Error>> {
|
|
// Parse the address we're going to run this server on
|
|
// and set up our TCP listener to accept connections.
|
|
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
|
let addr = addr.parse::<SocketAddr>()?;
|
|
let listener = TcpListener::bind(&addr).map_err(|_| "failed to bind")?;
|
|
println!("Listening on: {}", addr);
|
|
|
|
// Create the shared state of this server that will be shared amongst all
|
|
// clients. We populate the initial database and then create the `Database`
|
|
// structure. Note the usage of `Arc` here which will be used to ensure that
|
|
// each independently spawned client will have a reference to the in-memory
|
|
// database.
|
|
let mut initial_db = HashMap::new();
|
|
initial_db.insert("foo".to_string(), "bar".to_string());
|
|
let db = Arc::new(Database {
|
|
map: Mutex::new(initial_db),
|
|
});
|
|
|
|
let done = listener.incoming()
|
|
.map_err(|e| println!("error accepting socket; error = {:?}", e))
|
|
.for_each(move |socket| {
|
|
// As with many other small examples, the first thing we'll do is
|
|
// *split* this TCP stream into two separately owned halves. This'll
|
|
// allow us to work with the read and write halves independently.
|
|
let (reader, writer) = socket.split();
|
|
|
|
// Since our protocol is line-based we use `tokio_io`'s `lines` utility
|
|
// to convert our stream of bytes, `reader`, into a `Stream` of lines.
|
|
let lines = lines(BufReader::new(reader));
|
|
|
|
// Here's where the meat of the processing in this server happens. First
|
|
// we see a clone of the database being created, which is creating a
|
|
// new reference for this connected client to use. Also note the `move`
|
|
// keyword on the closure here which moves ownership of the reference
|
|
// into the closure, which we'll need for spawning the client below.
|
|
//
|
|
// The `map` function here means that we'll run some code for all
|
|
// requests (lines) we receive from the client. The actual handling here
|
|
// is pretty simple, first we parse the request and if it's valid we
|
|
// generate a response based on the values in the database.
|
|
let db = db.clone();
|
|
let responses = lines.map(move |line| {
|
|
let request = match Request::parse(&line) {
|
|
Ok(req) => req,
|
|
Err(e) => return Response::Error { msg: e },
|
|
};
|
|
|
|
let mut db = db.map.lock().unwrap();
|
|
match request {
|
|
Request::Get { key } => {
|
|
match db.get(&key) {
|
|
Some(value) => Response::Value { key, value: value.clone() },
|
|
None => Response::Error { msg: format!("no key {}", key) },
|
|
}
|
|
}
|
|
Request::Set { key, value } => {
|
|
let previous = db.insert(key.clone(), value.clone());
|
|
Response::Set { key, value, previous }
|
|
}
|
|
}
|
|
});
|
|
|
|
// At this point `responses` is a stream of `Response` types which we
|
|
// now want to write back out to the client. To do that we use
|
|
// `Stream::fold` to perform a loop here, serializing each response and
|
|
// then writing it out to the client.
|
|
let writes = responses.fold(writer, |writer, response| {
|
|
let mut response = response.serialize();
|
|
response.push('\n');
|
|
write_all(writer, response.into_bytes()).map(|(w, _)| w)
|
|
});
|
|
|
|
// Like with other small servers, we'll `spawn` this client to ensure it
|
|
// runs concurrently with all other clients, for now ignoring any errors
|
|
// that we see.
|
|
let msg = writes.then(move |_| Ok(()));
|
|
|
|
tokio::spawn(msg)
|
|
});
|
|
|
|
tokio::run(done);
|
|
Ok(())
|
|
}
|
|
|
|
impl Request {
|
|
fn parse(input: &str) -> Result<Request, String> {
|
|
let mut parts = input.splitn(3, " ");
|
|
match parts.next() {
|
|
Some("GET") => {
|
|
let key = match parts.next() {
|
|
Some(key) => key,
|
|
None => return Err(format!("GET must be followed by a key")),
|
|
};
|
|
if parts.next().is_some() {
|
|
return Err(format!("GET's key must not be followed by anything"))
|
|
}
|
|
Ok(Request::Get { key: key.to_string() })
|
|
}
|
|
Some("SET") => {
|
|
let key = match parts.next() {
|
|
Some(key) => key,
|
|
None => return Err(format!("SET must be followed by a key")),
|
|
};
|
|
let value = match parts.next() {
|
|
Some(value) => value,
|
|
None => return Err(format!("SET needs a value")),
|
|
};
|
|
Ok(Request::Set { key: key.to_string(), value: value.to_string() })
|
|
}
|
|
Some(cmd) => Err(format!("unknown command: {}", cmd)),
|
|
None => Err(format!("empty input")),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Response {
|
|
fn serialize(&self) -> String {
|
|
match *self {
|
|
Response::Value { ref key, ref value } => {
|
|
format!("{} = {}", key, value)
|
|
}
|
|
Response::Set { ref key, ref value, ref previous } => {
|
|
format!("set {} = `{}`, previous: {:?}", key, value, previous)
|
|
}
|
|
Response::Error { ref msg } => {
|
|
format!("error: {}", msg)
|
|
}
|
|
}
|
|
}
|
|
}
|