diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9db1044..70b2a9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: clippy: name: Clippy runs-on: ubuntu-latest - timeout-minutes: 45 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@master @@ -37,6 +37,7 @@ jobs: fmt: name: Rust fmt runs-on: ubuntu-latest + timeout-minutes: 10 steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@master @@ -47,6 +48,7 @@ jobs: deny: runs-on: ubuntu-latest + timeout-minutes: 10 steps: - uses: actions/checkout@v4 - uses: EmbarkStudios/cargo-deny-action@v2 @@ -54,7 +56,7 @@ jobs: doc: name: Documentation runs-on: ubuntu-latest - timeout-minutes: 45 + timeout-minutes: 30 env: RUSTDOCFLAGS: -Dwarnings steps: @@ -63,30 +65,45 @@ jobs: with: toolchain: stable components: rustfmt - - run: cargo doc --color=always --verbose --no-deps + - run: cargo doc --all-features --color=always --verbose --workspace --no-deps build: runs-on: ubuntu-latest - timeout-minutes: 45 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@master with: toolchain: stable components: rustc-dev - - run: cargo check --benches --all-features --release + - run: cargo check --workspace --tests --all-features --release + + doc_test: + name: Doc tests + needs: build + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@nightly + with: + components: llvm-tools, rustc-dev + - run: > + cargo test --doc + --verbose --color always + --workspace --no-fail-fast test: name: Tests needs: build runs-on: ubuntu-latest - timeout-minutes: 45 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@nightly with: components: llvm-tools, rustc-dev - run: > - cargo test --all-features --release - --tests --verbose --color always + cargo test --release --tests + --verbose --color always --workspace --no-fail-fast diff --git a/Cargo.toml b/Cargo.toml index b454838..f9961e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prosa-hyper" -version = "0.2.0" +version = "0.3.0" authors = ["Jérémy HERGAULT ", "Anthony THOMAS ", "Julien TERUEL ", "Rene-Louis EYMARD "] description = "ProSA Hyper processor for HTTP client/server" homepage = "https://worldline.com/" @@ -20,10 +20,10 @@ adaptor = ["server::adaptor::HelloHyperServerAdaptor"] bytes = "1" thiserror = "2" serde = { version = "1", features = ["derive"] } -tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] } +tokio = { version = ">=1.48, < 2", features = ["macros", "net", "rt", "rt-multi-thread"] } tracing = "0.1" -prosa-utils = "0.3" -prosa = "0.3" +prosa-utils = "0.4" +prosa = "0.4" aquamarine = "0.6" url = { version = "2", features = ["serde"] } @@ -34,7 +34,7 @@ http-body-util = "0.1" hyper-util = { version = "0.1", features = ["full"] } [dev-dependencies] -openssl = "0.10" +openssl = ">=0.10.75, < 0.11" reqwest = { version = "0.12", features = ["rustls-tls"] } config = { version = "0.15", default-features = false, features = ["toml", "json", "yaml", "json5", "convert-case", "async"] } clap = "4" diff --git a/README.md b/README.md index d85676c..3ce92f3 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # ProSA Hyper +[github](https://github.com/worldline/ProSA-Hyper) +[crates-io](https://crates.io/crates/prosa-hyper) +[docs-rs](https://docs.rs/prosa-hyper) +[build status](https://github.com/worldline/ProSA-Hyper/actions?query=branch%3Amain) +[dependency status](https://deps.rs/repo/github/worldline/ProSA-Hyper) + [ProSA](https://github.com/worldline/ProSA) Hyper processor for HTTP client/server build on [Hyper](https://hyper.rs/), a Tokio implementation of HTTP. ## Use @@ -30,6 +36,8 @@ http_server: passphrase: MySuperPassphrase ``` +If you have some slow services, you can set the `service_timeout` parameter (800 ms by default). + ## Examples ### Server diff --git a/examples/server.rs b/examples/server.rs index ccc31b5..73a15fd 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -11,7 +11,7 @@ use hyper::{Request, Response}; use prosa::core::adaptor::Adaptor; use prosa::core::error::ProcError; use prosa::core::main::MainRunnable as _; -use prosa::core::proc::{Proc, ProcConfig}; +use prosa::core::proc::{Proc, ProcBusParam as _, ProcConfig}; use prosa::core::settings::settings; use prosa::stub::adaptor::StubParotAdaptor; use prosa::stub::proc::StubSettings; @@ -41,12 +41,9 @@ where + prosa_utils::msg::tvf::Tvf + std::default::Default, { - fn new( - _proc: &HyperServerProc, - prosa_name: &str, - ) -> Result> { + fn new(proc: &HyperServerProc) -> Result> { Ok(HyperDemoAdaptor { - prosa_name: prosa_name.into(), + prosa_name: proc.name().to_string(), }) } @@ -57,6 +54,10 @@ where match req.uri().path() { "/" => HyperResp::HttpResp( Response::builder() + .header( + "Server", + >::SERVER_HEADER, + ) .body(BoxBody::new(Full::new(Bytes::from(format!( "{} - Home of {}", if req.version() == hyper::Version::HTTP_2 { @@ -77,6 +78,10 @@ where _ => HyperResp::HttpResp( Response::builder() .status(404) + .header( + "Server", + >::SERVER_HEADER, + ) .body(BoxBody::new(Full::new(Bytes::from("Not Found")))) .unwrap(), ), @@ -137,22 +142,31 @@ async fn main() -> Result<(), Box> { prosa_hyper_settings.observability.tracing_init(&filter)?; // Create bus and main processor - let (bus, main) = MainProc::::create(&prosa_hyper_settings); + let (bus, main) = MainProc::::create(&prosa_hyper_settings, Some(2)); // Launch the main task debug!("Launch the main task"); let main_task = main.run(); debug!("Start the Hyper processor"); - let http_proc = - HyperServerProc::::create(1, bus.clone(), prosa_hyper_settings.hyper); - Proc::::run(http_proc, String::from("hyper")); + let http_proc = HyperServerProc::::create( + 1, + String::from("hyper"), + bus.clone(), + prosa_hyper_settings.hyper, + ); + Proc::::run(http_proc); if matches.contains_id("stub") && matches.get_flag("stub") { debug!("Start a Stub processor"); let stub_settings = StubSettings::new(vec![String::from("SRV_TEST")]); - let stub_proc = StubProc::::create(2, bus.clone(), stub_settings); - Proc::::run(stub_proc, String::from("STUB_PROC")); + let stub_proc = StubProc::::create( + 2, + String::from("STUB_PROC"), + bus.clone(), + stub_settings, + ); + Proc::::run(stub_proc); } // Wait on main task diff --git a/src/lib.rs b/src/lib.rs index 067036e..b10062d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,11 @@ +#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/COPYRIGHT"))] +//! +//! [![github]](https://github.com/worldline/ProSA-Hyper) [![crates-io]](https://crates.io/crates/prosa-hyper) [![docs-rs]](crate) +//! +//! [github]: https://img.shields.io/badge/github-46beaa?style=for-the-badge&labelColor=555555&logo=github +//! [crates-io]: https://img.shields.io/badge/crates.io-ffeb78?style=for-the-badge&labelColor=555555&logo=rust +//! [docs-rs]: https://img.shields.io/badge/docs.rs-41b4d2?style=for-the-badge&labelColor=555555&logo=docs.rs +//! //! ProSA Hyper processor to handle HTTP client and server #![warn(missing_docs)] diff --git a/src/server.rs b/src/server.rs index 44423fa..8e69abf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,16 +1,5 @@ //! Module to handle HTTP server -use std::time::{Duration, SystemTime}; - -use prosa::core::{ - error::BusError, - msg::{InternalMsg, Msg}, -}; - -use tokio::sync::oneshot; - -use tracing::{Level, Span, span}; - /// Adaptor for Hyper server processor pub mod adaptor; /// ProSA Hyper server processor @@ -18,123 +7,6 @@ pub mod proc; /// Hyper service definition pub(crate) mod service; -/// Hyper processor -#[derive(Debug)] -pub struct HyperProcMsg -where - M: 'static - + std::marker::Send - + std::marker::Sync - + std::marker::Sized - + std::clone::Clone - + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf - + std::default::Default, -{ - id: u64, - span: Span, - service: String, - data: Option, - begin_time: SystemTime, - response_queue: Option>>, -} - -impl HyperProcMsg -where - M: 'static - + std::marker::Send - + std::marker::Sync - + std::marker::Sized - + std::clone::Clone - + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf - + std::default::Default, -{ - /// Create a new Hyper processor message - pub fn new( - service: String, - data: M, - response_queue: oneshot::Sender>, - ) -> HyperProcMsg { - let span = span!(Level::INFO, "HyperProcMsg", service = service); - HyperProcMsg { - id: 0, - service, - span, - data: Some(data), - begin_time: SystemTime::now(), - response_queue: Some(response_queue), - } - } - - /// Set the ID of the Hyper processor - pub fn set_id(&mut self, id: u64) { - self.id = id; - } - - /// Get the response queue to respond to the message - pub fn get_response_queue(&mut self) -> Result>, BusError> { - self.response_queue - .take() - .ok_or(BusError::InternalQueue("HyperProcMsg".to_string())) - } -} - -impl Msg for HyperProcMsg -where - M: 'static - + std::marker::Send - + std::marker::Sync - + std::marker::Sized - + std::clone::Clone - + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf - + std::default::Default, -{ - fn get_id(&self) -> u64 { - self.id - } - - fn get_service(&self) -> &String { - &self.service - } - - fn get_span(&self) -> &Span { - &self.span - } - - fn get_span_mut(&mut self) -> &mut Span { - &mut self.span - } - - fn enter_span(&self) -> span::Entered<'_> { - self.span.enter() - } - - fn get_data(&self) -> Result<&M, BusError> { - self.data.as_ref().ok_or(BusError::NoData) - } - - fn get_data_mut(&mut self) -> Result<&mut M, BusError> { - self.data.as_mut().ok_or(BusError::NoData) - } - - fn elapsed(&self) -> Duration { - self.begin_time.elapsed().unwrap_or(Duration::new(0, 0)) - } - - fn take_data(&mut self) -> Option { - self.data.take() - } - - fn take_data_if

(&mut self, predicate: P) -> Option - where - P: FnOnce(&mut M) -> bool, - { - self.data.take_if(predicate) - } -} - #[cfg(test)] mod tests { use bytes::Bytes; @@ -226,7 +98,6 @@ mod tests { { fn new( _proc: &crate::server::proc::HyperServerProc, - _prosa_name: &str, ) -> Result> where Self: Sized, @@ -266,15 +137,19 @@ mod tests { let url = settings.server.listener.url.clone(); // Create bus and main processor - let (bus, main) = MainProc::::create(&settings); + let (bus, main) = MainProc::::create(&settings, Some(1)); // Launch the main task let main_task = main.run(); // Launch an HTTP server processor - let http_server_proc = - HyperServerProc::::create(1, bus.clone(), settings.server); - Proc::::run(http_server_proc, String::from("HTTP_SERVER_PROC")); + let http_server_proc = HyperServerProc::::create( + 1, + String::from("HTTP_SERVER_PROC"), + bus.clone(), + settings.server, + ); + Proc::::run(http_server_proc); // Wait for processor to start std::thread::sleep(Duration::from_secs(1)); diff --git a/src/server/adaptor.rs b/src/server/adaptor.rs index c17f023..2e34d87 100644 --- a/src/server/adaptor.rs +++ b/src/server/adaptor.rs @@ -4,7 +4,7 @@ use std::convert::Infallible; use bytes::Bytes; use http_body_util::{Full, combinators::BoxBody}; use hyper::{Request, Response}; -use prosa::core::{adaptor::Adaptor, error::ProcError}; +use prosa::core::{adaptor::Adaptor, error::ProcError, proc::ProcBusParam as _}; use crate::HyperResp; @@ -29,7 +29,7 @@ where + std::marker::Sized + std::clone::Clone + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf + + prosa::core::msg::Tvf + std::default::Default, { /// Server header value send by the server @@ -43,10 +43,7 @@ where const SERVER_HEADER: &'static str = concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION")); /// Create a new adaptor - fn new( - proc: &HyperServerProc, - prosa_name: &str, - ) -> Result> + fn new(proc: &HyperServerProc) -> Result> where Self: Sized; @@ -74,15 +71,12 @@ where + std::marker::Sized + std::clone::Clone + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf + + prosa::core::msg::Tvf + std::default::Default, { - fn new( - _proc: &HyperServerProc, - prosa_name: &str, - ) -> Result> { + fn new(proc: &HyperServerProc) -> Result> { Ok(HelloHyperServerAdaptor { - hello_msg: format!("Hello from {prosa_name}"), + hello_msg: format!("Hello from {}", proc.name()), }) } diff --git a/src/server/proc.rs b/src/server/proc.rs index 008a96a..b121816 100644 --- a/src/server/proc.rs +++ b/src/server/proc.rs @@ -7,23 +7,22 @@ use prosa::{ core::{ adaptor::Adaptor, error::ProcError, - msg::{ErrorMsg, InternalMsg, Msg, RequestMsg}, + msg::{InternalMsg, Msg, RequestMsg}, proc::{Proc, ProcBusParam, ProcConfig as _, proc, proc_settings}, service::ServiceError, }, event::pending::PendingMsgs, - io::{listener::ListenerSetting, url_is_ssl}, + io::{SslConfig, listener::ListenerSetting, url_is_ssl}, }; -use prosa_utils::config::ssl::SslConfig; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use tracing::{Level, debug, info, span, warn}; +use tracing::{debug, info, warn}; use url::Url; use crate::{H2, server::service::HyperService}; -use super::{HyperProcMsg, adaptor::HyperServerAdaptor}; +use super::adaptor::HyperServerAdaptor; /// Hyper server processor settings #[proc_settings] @@ -84,23 +83,23 @@ where + std::marker::Sized + std::clone::Clone + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf + + prosa::core::msg::Tvf + std::default::Default, A: 'static + Adaptor + HyperServerAdaptor + Clone + std::marker::Send + std::marker::Sync, { /// Main loop of the processor - async fn internal_run(&mut self, name: String) -> Result<(), Box> { - // Initiate an adaptor for the stub processor - let adaptor = A::new(self, &name)?; + async fn internal_run(&mut self) -> Result<(), Box> { + // Initiate an adaptor for the hyper server processor + let adaptor = A::new(self)?; // Add proc main queue (id: 0) self.proc.add_proc().await?; // Declare an internal queue for HTTP requests - let (http_tx, mut http_rx) = mpsc::channel::>(2048); + let (http_tx, mut http_rx) = mpsc::channel::>(2048); // Declare a list for pending HTTP request - let mut pending_req = PendingMsgs::, M>::default(); + let mut pending_req = PendingMsgs::, M>::default(); // Set default protocol to HTTP2 if url_is_ssl(&self.settings.listener.url) { @@ -136,17 +135,16 @@ where self.get_proc_id(), msg ), - InternalMsg::Response(msg) => { - if let Some(mut hyper_msg) = pending_req.pull_msg(msg.get_id()) { - let response_queue = hyper_msg.get_response_queue()?; - let _ = response_queue.send(InternalMsg::Response(msg)); + InternalMsg::Response(mut msg) => { + if let Some(hyper_msg) = pending_req.pull_msg(msg.get_id()) + && let Some(data) = msg.take_data() + { + let _ = hyper_msg.return_to_sender(data); } } - InternalMsg::Error(err_msg) => { - if let Some(mut hyper_err_msg) = pending_req.pull_msg(err_msg.get_id()) { - let response_queue = hyper_err_msg.get_response_queue()?; - let _ = response_queue - .send(InternalMsg::Error(err_msg)); + InternalMsg::Error(mut err_msg) => { + if let Some(hyper_err_msg) = pending_req.pull_msg(err_msg.get_id()) { + let _ = hyper_err_msg.return_error_to_sender(err_msg.take_data(), err_msg.into_err()); } } InternalMsg::Command(_) => todo!(), @@ -169,10 +167,14 @@ where service.proc_queue.send(InternalMsg::Request(request)).await.unwrap(); pending_req.push_with_id(request_id, http_msg, self.settings.service_timeout); } else { - let service_name = http_msg.get_service().clone(); - let response_queue = http_msg.get_response_queue()?; + warn!( + parent: http_msg.get_span(), + code = "503", + "hyper::server::Msg", + ); let data = http_msg.take_data(); - let _ = response_queue.send(InternalMsg::Error(ErrorMsg::new(http_msg, service_name.clone(), span!(Level::WARN, "hyper::server::Msg", code = "503"), data, ServiceError::UnableToReachService(service_name)))); + let service_name = http_msg.get_service().clone(); + let _ = http_msg.return_error_to_sender(data, ServiceError::UnableToReachService(service_name)); } }, accept_result = listener.accept_raw() => { @@ -222,12 +224,15 @@ where }, Some(mut msg) = pending_req.pull(), if !pending_req.is_empty() => { warn!(parent: msg.get_span(), "Timeout message {:?}", msg); - - let service_name = msg.get_service().clone(); - let span_msg = msg.get_span().clone(); - let response_queue = msg.get_response_queue()?; let data = msg.take_data(); - let _ = response_queue.send(InternalMsg::Error(ErrorMsg::new(msg, service_name.clone(), span_msg, data, ServiceError::Timeout(service_name, self.settings.service_timeout.as_millis() as u64)))); + let service_name = msg.get_service().clone(); + let _ = msg.return_error_to_sender( + data, + ServiceError::Timeout( + service_name, + self.settings.service_timeout.as_millis() as u64, + ), + ); }, } } diff --git a/src/server/service.rs b/src/server/service.rs index d7fd09f..7702de8 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -12,12 +12,11 @@ use hyper::service::Service; use hyper::{Request, Response}; use opentelemetry::KeyValue; use opentelemetry::metrics::Counter; -use prosa::core::msg::{InternalMsg, Msg}; +use prosa::core::msg::{InternalMsg, Msg, RequestMsg}; use tokio::sync::{mpsc, oneshot}; use crate::hyper_version_str; -use super::HyperProcMsg; use super::adaptor::HyperServerAdaptor; #[derive(Debug, Clone)] @@ -31,11 +30,11 @@ where + std::marker::Sized + std::clone::Clone + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf + + prosa::core::msg::Tvf + std::default::Default, { adaptor: Arc, - proc_queue: mpsc::Sender>, + proc_queue: mpsc::Sender>, metric_counter: Counter, } @@ -48,13 +47,13 @@ where + std::marker::Sized + std::clone::Clone + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf + + prosa::core::msg::Tvf + std::default::Default, { /// Method to create an Hyper service pub(crate) fn new( adaptor: Arc, - proc_queue: mpsc::Sender>, + proc_queue: mpsc::Sender>, metric_counter: Counter, ) -> HyperService { HyperService { @@ -66,7 +65,7 @@ where async fn process_call( adaptor: Arc, - proc_queue: mpsc::Sender>, + proc_queue: mpsc::Sender>, req: Request, metric_counter: Counter, ) -> Result>, hyper::Error> { @@ -106,13 +105,13 @@ where /// Method to wait for response send by the ProSA HTTP processor async fn wait_intern_resp( adaptor: Arc, - proc_queue: mpsc::Sender>, + proc_queue: mpsc::Sender>, service_name: String, request: M, ) -> Result>, hyper::Error> { let (resp_tx, resp_rx) = oneshot::channel::>(); let _ = proc_queue - .send(HyperProcMsg::new(service_name, request, resp_tx)) + .send(RequestMsg::new(service_name, request, resp_tx)) .await; match resp_rx.await { @@ -177,7 +176,7 @@ where + std::marker::Sized + std::clone::Clone + std::fmt::Debug - + prosa_utils::msg::tvf::Tvf + + prosa::core::msg::Tvf + std::default::Default + std::marker::Sync, {