blob: 164e818fcde00fbdb214cb5a1f5c9ad54352932a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
use skywalking_rust::context::propagation::context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY;
use skywalking_rust::context::propagation::decoder::decode_propagation;
use skywalking_rust::context::propagation::encoder::encode_propagation;
use skywalking_rust::context::trace_context::TracingContext;
use skywalking_rust::reporter::grpc::Reporter;
use std::convert::Infallible;
use std::error::Error;
use std::net::SocketAddr;
use structopt::StructOpt;
use tokio::sync::mpsc;
static NOT_FOUND_MSG: &str = "not found";
static SUCCESS_MSG: &str = "Success";
async fn handle_ping(
_req: Request<Body>,
client: Client<HttpConnector>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
let mut context = TracingContext::default("producer", "node_0");
let span = context.create_entry_span("/ping").unwrap();
{
let span2 = context.create_exit_span("/pong", "consumer:8082").unwrap();
let header = encode_propagation(&context, "/pong", "consumer:8082");
let req = Request::builder()
.method(Method::GET)
.header(SKYWALKING_HTTP_CONTEXT_HEADER_KEY, header)
.uri("http://consumer:8082/pong")
.body(Body::from(""))
.unwrap();
client.request(req).await.unwrap();
context.finalize_span(span2);
}
context.finalize_span(span);
let _ = tx.send(context).await;
Ok(Response::new(Body::from("hoge")))
}
async fn producer_response(
_req: Request<Body>,
client: Client<HttpConnector>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
match (_req.method(), _req.uri().path()) {
(&Method::GET, "/ping") => handle_ping(_req, client, tx).await,
(&Method::GET, "/healthCheck") => Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(SUCCESS_MSG))
.unwrap()),
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(NOT_FOUND_MSG))
.unwrap()),
}
}
async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
let client = Client::new();
let make_svc = make_service_fn(|_| {
let tx = tx.clone();
let client = client.clone();
async {
Ok::<_, Infallible>(service_fn(move |req| {
producer_response(req, client.to_owned(), tx.to_owned())
}))
}
});
let addr = SocketAddr::from((host, 8081));
let server = Server::bind(&addr).serve(make_svc);
println!("starting producer on {:?}...", &addr);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
async fn handle_pong(
_req: Request<Body>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
let ctx = decode_propagation(
&_req.headers()[SKYWALKING_HTTP_CONTEXT_HEADER_KEY]
.to_str()
.unwrap(),
)
.unwrap();
let mut context = TracingContext::from_propagation_context("consumer", "node_0", ctx);
let span = context.create_entry_span("/pong").unwrap();
context.finalize_span(span);
let _ = tx.send(context).await;
Ok(Response::new(Body::from("hoge")))
}
async fn consumer_response(
_req: Request<Body>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
match (_req.method(), _req.uri().path()) {
(&Method::GET, "/pong") => handle_pong(_req, tx).await,
(&Method::GET, "/healthCheck") => Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(SUCCESS_MSG))
.unwrap()),
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(NOT_FOUND_MSG))
.unwrap()),
}
}
async fn run_consumer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
let make_svc = make_service_fn(|_| {
let tx = tx.clone();
async { Ok::<_, Infallible>(service_fn(move |req| consumer_response(req, tx.to_owned()))) }
});
let addr = SocketAddr::from((host, 8082));
let server = Server::bind(&addr).serve(make_svc);
println!("starting consumer on {:?}...", &addr);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
#[derive(StructOpt)]
#[structopt(name = "basic")]
struct Opt {
#[structopt(short, long)]
mode: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
let reporter = Reporter::start("http://collector:19876").await;
let tx = reporter.sender();
if opt.mode == "consumer" {
run_consumer_service([0, 0, 0, 0], tx).await;
} else if opt.mode == "producer" {
run_producer_service([0, 0, 0, 0], tx).await;
}
reporter.shutdown().await?;
Ok(())
}