Initial discovery, balance, and reconnect sketchs (#12)

This commit is contained in:
Carl Lerche
2017-10-05 13:41:44 -07:00
committed by GitHub
parent dffa59d90f
commit 50905b330f
10 changed files with 495 additions and 0 deletions

View File

@@ -18,9 +18,12 @@ readme = "README.md"
members = [
"./",
"tower-balance",
"tower-discover",
"tower-filter",
"tower-mock",
"tower-rate-limit",
"tower-reconnect",
"tower-route",
"tower-timeout",
]

10
tower-balance/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "tower-balance"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
[dependencies]
futures = "0.1"
tower = { version = "0.1", path = "../" }
tower-discover = { version = "0.1", path = "../tower-discover" }
ordermap = "0.2"

12
tower-balance/README.md Normal file
View File

@@ -0,0 +1,12 @@
Tower Balance
A Tower middleware that load balances across a uniform set of services.
# License
`tower-timeout` is primarily distributed under the terms of both the MIT license
and the Apache License (Version 2.0), with portions covered by various BSD-like
licenses.
See LICENSE-APACHE, and LICENSE-MIT for details.

190
tower-balance/src/lib.rs Normal file
View File

@@ -0,0 +1,190 @@
extern crate futures;
extern crate tower;
extern crate tower_discover;
extern crate ordermap;
use futures::{Future, Poll, Async};
use tower::Service;
use tower_discover::Discover;
use ordermap::OrderMap;
/// Balances requests across a set of inner services using a round-robin
/// strategy.
pub struct Balance<T>
where T: Discover,
{
balancer: RoundRobin<T>,
}
/// Error produced by `Balance`
pub enum Error<T, U> {
Inner(T),
Balance(U),
NotReady,
}
pub struct ResponseFuture<T>
where T: Discover,
{
inner: <T::Service as Service>::Future,
}
/// Round-robin based load balancing
struct RoundRobin<T>
where T: Discover,
{
/// The service discovery handle
discover: T,
/// The endpoints managed by the balancer
endpoints: OrderMap<T::Key, T::Service>,
/// Balancer entry to use when handling the next request
pos: usize,
}
// ===== impl Balance =====
impl<T> Balance<T>
where T: Discover,
{
/// Create a new balancer
pub fn new(discover: T) -> Self {
Balance {
balancer: RoundRobin {
discover,
endpoints: OrderMap::new(),
pos: 0,
},
}
}
}
impl<T> Service for Balance<T>
where T: Discover,
{
type Request = T::Request;
type Response = T::Response;
type Error = Error<T::Error, T::DiscoverError>;
type Future = ResponseFuture<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.balancer.poll_ready()
.map_err(Error::Balance)
}
fn call(&mut self, request: Self::Request) -> Self::Future {
let response = self.balancer.next().call(request);
ResponseFuture { inner: response }
}
}
// ===== impl RoundRobin =====
impl<T> RoundRobin<T>
where T: Discover,
{
// ===== potentially a trait API =====
/// Returns `Ready` when the balancer is ready to accept a request.
fn poll_ready(&mut self) -> Poll<(), T::DiscoverError> {
try!(self.update_endpoints());
let len = self.endpoints.len();
// Loop over endpoints, finding the first that is available.
for _ in 0..len {
let res = self.endpoints
.get_index_mut(self.pos)
.unwrap().1
.poll_ready();
match res {
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
// Go to the next one
Ok(Async::NotReady) => {}
// TODO: How to handle the error
Err(_) => {}
}
self.inc_pos();
}
// Not ready
Ok(Async::NotReady)
}
/// Returns a reference to the service to dispatch the next request to.
///
/// It is expected that once `poll_ready` returns `Ready`, this function
/// must succeed.
///
/// # Panics
///
/// This function may panic if `poll_ready` did not return `Ready`
/// immediately prior.
fn next(&mut self) -> &mut T::Service {
let pos = self.pos;
self.inc_pos();
match self.endpoints.get_index_mut(pos) {
Some((_, val)) => val,
_ => panic!(),
}
}
// ===== internal =====
fn update_endpoints(&mut self) -> Result<(), T::DiscoverError> {
use tower_discover::Change::*;
loop {
let change = match try!(self.discover.poll()) {
Async::Ready(change) => change,
Async::NotReady => return Ok(()),
};
match change {
Insert(key, val) => {
// TODO: What to do if there already is an entry with the
// given `key` in the set?
self.endpoints.entry(key).or_insert(val);
}
Remove(key) => {
// TODO: What to do if there is no entry?
self.endpoints.remove(&key);
}
}
// For now, we're just going to reset the pos to zero, but this is
// definitely not ideal.
//
// TODO: improve
self.pos = 0;
}
}
fn inc_pos(&mut self) {
self.pos = (self.pos + 1) % self.endpoints.len();
}
}
// ===== impl ResponseFuture =====
impl<T> Future for ResponseFuture<T>
where T: Discover,
{
type Item = T::Response;
type Error = Error<T::Error, T::DiscoverError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
.map_err(Error::Inner)
}
}

View File

@@ -0,0 +1,8 @@
[package]
name = "tower-discover"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
[dependencies]
futures = "0.1"
tower = { version = "0.1", path = "../" }

11
tower-discover/README.md Normal file
View File

@@ -0,0 +1,11 @@
Tower Discovery
Abstracts over service discovery strategies.
# License
`tower-discover` is primarily distributed under the terms of both the MIT license
and the Apache License (Version 2.0), with portions covered by various BSD-like
licenses.
See LICENSE-APACHE, and LICENSE-MIT for details.

92
tower-discover/src/lib.rs Normal file
View File

@@ -0,0 +1,92 @@
//! # Tower service discovery
//!
//! Service discovery is the automatic detection of services available to the
//! consumer. These services typically live on other servers and are accessible
//! via the network; however, it is possible to discover services available in
//! other processes or even in process.
extern crate futures;
extern crate tower;
use futures::{Poll, Async};
use tower::Service;
use std::hash::Hash;
use std::iter::{Enumerate, IntoIterator};
/// Provide a uniform set of services able to satisfy a request.
///
/// This set of services may be updated over time. On each change to the set, a
/// new `NewServiceSet` is yielded by `Discover`.
///
/// See crate documentation for more details.
pub trait Discover {
/// NewService key
type Key: Hash + Eq;
/// Requests handled by the discovered services
type Request;
/// Responses given by the discovered services
type Response;
/// Errors produced by the discovered services
type Error;
/// The discovered `Service` instance.
type Service: Service<Request = Self::Request,
Response = Self::Response,
Error = Self::Error>;
/// Error produced during discovery
type DiscoverError;
/// Yields the next discovery change set.
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError>;
}
/// A change in the service set
pub enum Change<K, V> {
Insert(K, V),
Remove(K),
}
/// Static service discovery based on a predetermined list of services.
///
/// `List` is created with an initial list of services. The discovery process
/// will yield this list once and do nothing after.
pub struct List<T> {
inner: Enumerate<T>,
}
// ===== impl List =====
impl<T, U> List<T>
where T: Iterator<Item = U>,
U: Service,
{
pub fn new<I>(services: I) -> List<T>
where I: IntoIterator<Item = U, IntoIter = T>,
{
List { inner: services.into_iter().enumerate() }
}
}
impl<T, U> Discover for List<T>
where T: Iterator<Item = U>,
U: Service,
{
type Key = usize;
type Request = U::Request;
type Response = U::Response;
type Error = U::Error;
type Service = U;
type DiscoverError = ();
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
match self.inner.next() {
Some((i, service)) => Ok(Change::Insert(i, service).into()),
None => Ok(Async::NotReady),
}
}
}

View File

@@ -0,0 +1,8 @@
[package]
name = "tower-reconnect"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
[dependencies]
futures = "0.1"
tower = { version = "0.1", path = "../" }

12
tower-reconnect/README.md Normal file
View File

@@ -0,0 +1,12 @@
Tower Reconnect
A Tower middleware that automatically recreates an inner service when an error
is encountered.
# License
`tower-timeout` is primarily distributed under the terms of both the MIT license
and the Apache License (Version 2.0), with portions covered by various BSD-like
licenses.
See LICENSE-APACHE, and LICENSE-MIT for details.

149
tower-reconnect/src/lib.rs Normal file
View File

@@ -0,0 +1,149 @@
extern crate futures;
extern crate tower;
use futures::{Future, Async, Poll};
use tower::{Service, NewService};
use std::fmt;
pub struct Reconnect<T>
where T: NewService,
{
new_service: T,
state: State<T>,
}
#[derive(Debug)]
pub enum Error<T, U> {
Inner(T),
Connect(U),
NotReady,
}
pub struct ResponseFuture<T>
where T: NewService
{
inner: Option<<T::Service as Service>::Future>,
}
#[derive(Debug)]
enum State<T>
where T: NewService
{
Idle,
Connecting(T::Future),
Connected(T::Service),
}
// ===== impl Reconnect =====
impl<T> Reconnect<T>
where T: NewService,
{
pub fn new(new_service: T) -> Self {
Reconnect {
new_service,
state: State::Idle,
}
}
}
impl<T> Service for Reconnect<T>
where T: NewService
{
type Request = T::Request;
type Response = T::Response;
type Error = Error<T::Error, T::InitError>;
type Future = ResponseFuture<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
use self::State::*;
let ret;
let mut state;
loop {
match self.state {
Idle => {
let fut = self.new_service.new_service();
self.state = Connecting(fut);
continue;
}
Connecting(ref mut f) => {
match f.poll() {
Ok(Async::Ready(service)) => {
state = Connected(service);
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(e) => {
state = Idle;
ret = Err(Error::Connect(e));
break;
}
}
}
Connected(ref mut inner) => {
match inner.poll_ready() {
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(_) => {
state = Idle;
}
}
}
}
self.state = state;
}
self.state = state;
ret
}
fn call(&mut self, request: Self::Request) -> Self::Future {
use self::State::*;
let service = match self.state {
Connected(ref mut service) => service,
_ => return ResponseFuture { inner: None },
};
let fut = service.call(request);
ResponseFuture { inner: Some(fut) }
}
}
impl<T> fmt::Debug for Reconnect<T>
where T: NewService + fmt::Debug,
T::Future: fmt::Debug,
T::Service: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Reconnect")
.field("new_service", &self.new_service)
.field("state", &self.state)
.finish()
}
}
// ===== impl ResponseFuture =====
impl<T: NewService> Future for ResponseFuture<T> {
type Item = T::Response;
type Error = Error<T::Error, T::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Some(ref mut f) => {
f.poll().map_err(Error::Inner)
}
None => Err(Error::NotReady),
}
}
}