blob: 9e14378564acdc8886fc81874f60975f827e5ac7 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod handlers;
use crate::SchedulerServer;
use anyhow::Result;
use std::{
pin::Pin,
task::{Context as TaskContext, Poll},
};
use warp::filters::BoxedFilter;
use warp::{Buf, Filter, Reply};
pub enum EitherBody<A, B> {
Left(A),
Right(B),
}
pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type HttpBody = dyn http_body::Body<Data = dyn Buf, Error = Error> + 'static;
impl<A, B> http_body::Body for EitherBody<A, B>
where
A: http_body::Body + Send + Unpin,
B: http_body::Body<Data = A::Data> + Send + Unpin,
A::Error: Into<Error>,
B::Error: Into<Error>,
{
type Data = A::Data;
type Error = Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}
fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}
}
fn map_option_err<T, U: Into<Error>>(
err: Option<Result<T, U>>,
) -> Option<Result<T, Error>> {
err.map(|e| e.map_err(Into::into))
}
fn with_data_server(
db: SchedulerServer,
) -> impl Filter<Extract = (SchedulerServer,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || db.clone())
}
pub fn get_routes(scheduler_server: SchedulerServer) -> BoxedFilter<(impl Reply,)> {
let routes = warp::path("executors")
.and(with_data_server(scheduler_server))
.and_then(handlers::list_executors_data);
routes.boxed()
}