diff --git a/Cargo.lock b/Cargo.lock index 9f28d215..b6ea9784 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,6 +1859,7 @@ version = "0.1.0" dependencies = [ "prost", "spider-core", + "thiserror", "tonic", "tonic-build", ] @@ -1872,9 +1873,11 @@ dependencies = [ "async-trait", "dashmap", "spider-core", + "spider-proto-rust", "thiserror", "tokio", "tokio-util", + "tonic", ] [[package]] diff --git a/components/spider-execution-manager/src/client.rs b/components/spider-execution-manager/src/client.rs index 63b132ce..c2ac95e4 100644 --- a/components/spider-execution-manager/src/client.rs +++ b/components/spider-execution-manager/src/client.rs @@ -11,7 +11,7 @@ pub mod liveness; pub mod scheduler; pub mod storage; -pub use grpc::GrpcStorageClient; +pub use grpc::{GrpcLivenessClient, GrpcStorageClient}; pub use liveness::{LivenessClient, LivenessResponseError, RegistrationResponse}; pub use scheduler::{SchedulerClient, SchedulerError, SchedulerResponse}; pub use storage::{StorageClient, StorageResponseError}; diff --git a/components/spider-execution-manager/src/client/grpc/liveness.rs b/components/spider-execution-manager/src/client/grpc/liveness.rs new file mode 100644 index 00000000..af5c0fb1 --- /dev/null +++ b/components/spider-execution-manager/src/client/grpc/liveness.rs @@ -0,0 +1,213 @@ +//! gRPC-backed [`LivenessClient`] implementation. + +use std::net::IpAddr; + +use async_trait::async_trait; +use spider_core::types::id::{ExecutionManagerId, SessionId}; +use spider_proto_rust::storage::{ + self, + execution_manager_liveness_error, + execution_manager_liveness_service_client::ExecutionManagerLivenessServiceClient, + register_execution_manager_response, + update_execution_manager_heartbeat_response, +}; +use tonic::transport::{Channel, Endpoint}; + +use crate::client::liveness::{LivenessClient, LivenessResponseError, RegistrationResponse}; + +/// gRPC-backed [`LivenessClient`] implementation. +#[derive(Debug, Clone)] +pub struct GrpcLivenessClient { + client: ExecutionManagerLivenessServiceClient, +} + +impl GrpcLivenessClient { + /// Connects to the storage gRPC endpoint. + /// + /// # Returns + /// + /// A new [`GrpcLivenessClient`] connected to `endpoint` on success. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`LivenessResponseError::Transport`] if tonic cannot create or connect to the endpoint. + pub async fn connect(endpoint: Endpoint) -> Result { + ExecutionManagerLivenessServiceClient::connect(endpoint) + .await + .map(|client| Self { client }) + .map_err(to_transport_error) + } +} + +#[async_trait] +impl LivenessClient for GrpcLivenessClient { + async fn register(&self, ip: IpAddr) -> Result { + let request = storage::RegisterExecutionManagerRequest { + ip_address: ip.to_string(), + }; + let response = self + .client + .clone() + .register_execution_manager(request) + .await + .map_err(to_transport_error)? + .into_inner(); + + register_response_to_result(response) + } + + async fn heartbeat( + &self, + em_id: ExecutionManagerId, + ) -> Result { + let request = storage::ExecutionManagerIdRequest { + execution_manager_id: em_id.get(), + }; + let response = self + .client + .clone() + .update_execution_manager_heartbeat(request) + .await + .map_err(to_transport_error)? + .into_inner(); + + heartbeat_response_to_result(response) + } +} + +impl From for LivenessResponseError { + fn from(error: storage::ExecutionManagerLivenessError) -> Self { + match execution_manager_liveness_error::ErrCode::try_from(error.err_code) { + Ok(execution_manager_liveness_error::ErrCode::MarkedDead) => Self::MarkedDead, + Ok(execution_manager_liveness_error::ErrCode::InvalidInput) => { + Self::IllegalId(error.message) + } + Ok( + execution_manager_liveness_error::ErrCode::Server + | execution_manager_liveness_error::ErrCode::Unspecified, + ) => Self::Transport(error.message), + Err(error) => Self::Transport(format!( + "unknown execution manager liveness error kind: {error}" + )), + } + } +} + +/// # Returns +/// +/// [`storage::RegisterExecutionManagerResponse`] converted into +/// [`Result`]. +fn register_response_to_result( + response: storage::RegisterExecutionManagerResponse, +) -> Result { + match response.result { + Some(register_execution_manager_response::Result::Registration(registration)) => { + Ok(RegistrationResponse { + em_id: ExecutionManagerId::from(registration.execution_manager_id), + session_id: registration.session_id, + }) + } + Some(register_execution_manager_response::Result::Error(error)) => Err(error.into()), + None => Err(LivenessResponseError::Transport( + "register execution manager response missing result".to_owned(), + )), + } +} + +/// # Returns +/// +/// [`storage::UpdateExecutionManagerHeartbeatResponse`] converted into +/// [`Result`]. +fn heartbeat_response_to_result( + response: storage::UpdateExecutionManagerHeartbeatResponse, +) -> Result { + match response.result { + Some(update_execution_manager_heartbeat_response::Result::SessionId(session_id)) => { + Ok(session_id) + } + Some(update_execution_manager_heartbeat_response::Result::Error(error)) => { + Err(error.into()) + } + None => Err(LivenessResponseError::Transport( + "update execution manager heartbeat response missing result".to_owned(), + )), + } +} + +/// Converts a displayable transport-layer error into [`LivenessResponseError::Transport`]. +/// +/// # Returns +/// +/// A [`LivenessResponseError::Transport`] containing `error`'s display string. +fn to_transport_error(error: impl std::fmt::Display) -> LivenessResponseError { + LivenessResponseError::Transport(error.to_string()) +} + +#[cfg(test)] +mod tests { + use spider_core::types::id::ExecutionManagerId; + + use super::*; + use crate::client::{LivenessResponseError, RegistrationResponse}; + + #[test] + fn register_response_to_result_returns_registration() { + const SESSION_ID: SessionId = 7; + const EM_ID: ExecutionManagerId = ExecutionManagerId::from(5); + + let response = storage::RegisterExecutionManagerResponse { + result: Some(register_execution_manager_response::Result::Registration( + storage::ExecutionManagerRegistration { + execution_manager_id: EM_ID.get(), + session_id: SESSION_ID, + }, + )), + }; + + let registration = register_response_to_result(response) + .expect("registration response conversion should succeed"); + + assert_eq!( + registration, + RegistrationResponse { + em_id: EM_ID, + session_id: SESSION_ID, + } + ); + } + + #[test] + fn heartbeat_response_to_result_returns_session_id() { + const SESSION_ID: SessionId = 9; + + let response = storage::UpdateExecutionManagerHeartbeatResponse { + result: Some( + update_execution_manager_heartbeat_response::Result::SessionId(SESSION_ID), + ), + }; + + let session_id = heartbeat_response_to_result(response) + .expect("heartbeat response conversion should succeed"); + + assert_eq!(session_id, SESSION_ID); + } + + #[test] + fn liveness_storage_error_maps_invalid_input_to_illegal_id() { + const ERROR_MSG: &str = "bad em id"; + + let error = storage::ExecutionManagerLivenessError { + err_code: execution_manager_liveness_error::ErrCode::InvalidInput.into(), + message: ERROR_MSG.to_owned(), + }; + + match LivenessResponseError::from(error) { + LivenessResponseError::IllegalId(message) => { + assert_eq!(message, ERROR_MSG); + } + error => panic!("unexpected liveness response error: {error:?}"), + } + } +} diff --git a/components/spider-execution-manager/src/client/grpc/mod.rs b/components/spider-execution-manager/src/client/grpc/mod.rs index 9f15ee9a..93a0a50e 100644 --- a/components/spider-execution-manager/src/client/grpc/mod.rs +++ b/components/spider-execution-manager/src/client/grpc/mod.rs @@ -1,5 +1,7 @@ //! gRPC-backed implementations of the execution manager's client traits. +pub mod liveness; pub mod storage; +pub use liveness::GrpcLivenessClient; pub use storage::GrpcStorageClient; diff --git a/components/spider-execution-manager/src/client/grpc/storage.rs b/components/spider-execution-manager/src/client/grpc/storage.rs index c037814a..378de5d4 100644 --- a/components/spider-execution-manager/src/client/grpc/storage.rs +++ b/components/spider-execution-manager/src/client/grpc/storage.rs @@ -11,9 +11,9 @@ use spider_core::types::{ use spider_proto_rust::storage::{ self, register_task_instance_response, - storage_error, - storage_operation_response, + task_instance_management_error, task_instance_management_service_client::TaskInstanceManagementServiceClient, + task_instance_operation_response, }; use tonic::transport::{Channel, Endpoint}; @@ -40,7 +40,7 @@ impl GrpcStorageClient { pub async fn connect(endpoint: Endpoint) -> Result { TaskInstanceManagementServiceClient::connect(endpoint) .await - .map(|inner| Self { client: inner }) + .map(|client| Self { client }) .map_err(to_transport_error) } } @@ -136,32 +136,37 @@ impl StorageClient for GrpcStorageClient { } } -impl From for StorageResponseError { - fn from(error: storage::StorageError) -> Self { - match storage_error::ErrCode::try_from(error.err_code) { - Ok(storage_error::ErrCode::StaleSession) => Self::StaleSession { +impl From for StorageResponseError { + fn from(error: storage::TaskInstanceManagementError) -> Self { + match task_instance_management_error::ErrCode::try_from(error.err_code) { + Ok(task_instance_management_error::ErrCode::StaleSession) => Self::StaleSession { storage_session: error.storage_session, }, - Ok(storage_error::ErrCode::CacheStale) => Self::CacheStale(error.message), - Ok(storage_error::ErrCode::Transport) => Self::Transport(error.message), - Ok(storage_error::ErrCode::Server | storage_error::ErrCode::Unspecified) => { - Self::Server(error.message) + Ok(task_instance_management_error::ErrCode::CacheStale) => { + Self::CacheStale(error.message) } - Ok(storage_error::ErrCode::InvalidInput) => Self::InvalidInput(error.message), - Err(error) => Self::Transport(format!("unknown storage error kind: {error}")), + Ok( + task_instance_management_error::ErrCode::Server + | task_instance_management_error::ErrCode::Unspecified, + ) => Self::Server(error.message), + Ok(task_instance_management_error::ErrCode::InvalidInput) => { + Self::InvalidInput(error.message) + } + Err(error) => Self::Transport(format!("unknown task instance error kind: {error}")), } } } /// # Returns /// -/// [`storage::StorageOperationResponse`] converted into [`Result<(), StorageResponseError>`]. +/// [`storage::TaskInstanceOperationResponse`] converted into +/// [`Result<(), StorageResponseError>`]. fn storage_operation_response_to_result( - response: storage::StorageOperationResponse, + response: storage::TaskInstanceOperationResponse, ) -> Result<(), StorageResponseError> { match response.result { - Some(storage_operation_response::Result::Ok(_)) => Ok(()), - Some(storage_operation_response::Result::Error(error)) => Err(error.into()), + Some(task_instance_operation_response::Result::Ok(_)) => Ok(()), + Some(task_instance_operation_response::Result::Error(error)) => Err(error.into()), None => Err(StorageResponseError::Transport( "storage operation response missing `result` message".to_owned(), )), @@ -183,8 +188,8 @@ mod tests { #[test] fn storage_error_maps_stale_session() { - let error = storage::StorageError { - err_code: storage_error::ErrCode::StaleSession.into(), + let error = storage::TaskInstanceManagementError { + err_code: task_instance_management_error::ErrCode::StaleSession.into(), message: "stale".to_owned(), storage_session: 7, }; @@ -199,7 +204,7 @@ mod tests { #[test] fn storage_error_maps_unknown_kind_to_transport_error() { - let error = storage::StorageError { + let error = storage::TaskInstanceManagementError { err_code: 99, message: "unknown".to_owned(), storage_session: 0, @@ -207,7 +212,7 @@ mod tests { match StorageResponseError::from(error) { StorageResponseError::Transport(message) => { - assert!(message.contains("unknown storage error kind")); + assert!(message.contains("unknown task instance error kind")); } error => panic!("unexpected storage response error: {error:?}"), } @@ -215,7 +220,7 @@ mod tests { #[test] fn missing_storage_operation_result_is_transport_error() { - match storage_operation_response_to_result(storage::StorageOperationResponse { + match storage_operation_response_to_result(storage::TaskInstanceOperationResponse { result: None, }) { Err(StorageResponseError::Transport(_)) => {} diff --git a/components/spider-proto-rust/Cargo.toml b/components/spider-proto-rust/Cargo.toml index 6a5e53db..dc5ec267 100644 --- a/components/spider-proto-rust/Cargo.toml +++ b/components/spider-proto-rust/Cargo.toml @@ -10,6 +10,7 @@ path = "src/lib.rs" [dependencies] prost = "0.13.5" spider-core = { path = "../spider-core" } +thiserror = "2.0.18" tonic = "0.12.3" [build-dependencies] diff --git a/components/spider-proto-rust/src/error.rs b/components/spider-proto-rust/src/error.rs new file mode 100644 index 00000000..f9014e7b --- /dev/null +++ b/components/spider-proto-rust/src/error.rs @@ -0,0 +1,17 @@ +//! Error types for converting protobuf wire values into Spider core types. + +/// Errors produced when converting a protobuf message into its Spider core representation. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// A protobuf task index could not be represented as a [`usize`]. + #[error("task index does not fit in `usize`: {0}")] + TaskIndexOutOfRange(u64), + + /// A protobuf [`crate::storage::TaskId`] carried no `kind`. + #[error("task id missing kind")] + TaskIdKindMissing, + + /// A protobuf [`crate::storage::JobState`] was left unspecified. + #[error("job state is unspecified")] + JobStateUnspecified, +} diff --git a/components/spider-proto-rust/src/generated/storage.rs b/components/spider-proto-rust/src/generated/storage.rs index 4a4cd353..970dc141 100644 --- a/components/spider-proto-rust/src/generated/storage.rs +++ b/components/spider-proto-rust/src/generated/storage.rs @@ -1,4 +1,157 @@ // This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubmitJobRequest { + #[prost(uint64, tag = "1")] + pub resource_group_id: u64, + #[prost(bytes = "vec", tag = "2")] + pub serialized_task_graph: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "3")] + pub serialized_inputs: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "4")] + pub session_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubmitJobResponse { + #[prost(oneof = "submit_job_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `SubmitJobResponse`. +pub mod submit_job_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(uint64, tag = "1")] + JobId(u64), + #[prost(message, tag = "2")] + Error(super::JobOrchestrationError), + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct JobIdRequest { + #[prost(uint64, tag = "1")] + pub job_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JobStateResponse { + #[prost(oneof = "job_state_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `JobStateResponse`. +pub mod job_state_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(enumeration = "super::JobState", tag = "1")] + State(i32), + #[prost(message, tag = "2")] + Error(super::JobOrchestrationError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JobOutputsResponse { + #[prost(oneof = "job_outputs_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `JobOutputsResponse`. +pub mod job_outputs_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(message, tag = "1")] + Outputs(super::JobOutputs), + #[prost(message, tag = "2")] + Error(super::JobOrchestrationError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JobOutputs { + #[prost(bytes = "vec", repeated, tag = "1")] + pub outputs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JobErrorResponse { + #[prost(oneof = "job_error_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `JobErrorResponse`. +pub mod job_error_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(string, tag = "1")] + ErrorMessage(::prost::alloc::string::String), + #[prost(message, tag = "2")] + Error(super::JobOrchestrationError), + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct DeleteExpiredTerminatedJobsRequest { + #[prost(uint64, tag = "1")] + pub expire_after_sec: u64, + #[prost(uint64, tag = "2")] + pub session_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteExpiredTerminatedJobsResponse { + #[prost(oneof = "delete_expired_terminated_jobs_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `DeleteExpiredTerminatedJobsResponse`. +pub mod delete_expired_terminated_jobs_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(message, tag = "1")] + DeletedJobs(super::DeletedJobs), + #[prost(message, tag = "2")] + Error(super::JobOrchestrationError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeletedJobs { + #[prost(uint64, repeated, tag = "1")] + pub job_ids: ::prost::alloc::vec::Vec, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ResendReadyTasksRequest { + #[prost(uint64, optional, tag = "1")] + pub job_id: ::core::option::Option, + #[prost(uint64, tag = "2")] + pub session_id: u64, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct PollReadyTasksRequest { + #[prost(uint64, tag = "1")] + pub max_items: u64, + #[prost(uint64, tag = "2")] + pub wait_ms: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PollReadyTasksResponse { + #[prost(oneof = "poll_ready_tasks_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `PollReadyTasksResponse`. +pub mod poll_ready_tasks_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(message, tag = "1")] + Tasks(super::ReadyTasks), + #[prost(message, tag = "2")] + Error(super::InboundQueueResponseError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadyTasks { + #[prost(uint64, tag = "1")] + pub session_id: u64, + #[prost(message, repeated, tag = "2")] + pub tasks: ::prost::alloc::vec::Vec, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyTask { + #[prost(uint64, tag = "1")] + pub resource_group_id: u64, + #[prost(uint64, tag = "2")] + pub job_id: u64, + #[prost(message, optional, tag = "3")] + pub task_id: ::core::option::Option, +} #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct RegisterTaskInstanceRequest { #[prost(uint64, tag = "1")] @@ -22,7 +175,7 @@ pub mod register_task_instance_response { #[prost(bytes, tag = "1")] ExecutionContext(::prost::alloc::vec::Vec), #[prost(message, tag = "2")] - Error(super::StorageError), + Error(super::TaskInstanceManagementError), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -51,6 +204,103 @@ pub struct ReportTaskFailureRequest { #[prost(string, tag = "5")] pub error_message: ::prost::alloc::string::String, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AddResourceGroupRequest { + #[prost(string, tag = "1")] + pub external_resource_group_id: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub password: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub session_id: u64, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ResourceGroupIdRequest { + #[prost(uint64, tag = "1")] + pub resource_group_id: u64, + #[prost(uint64, tag = "2")] + pub session_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceGroupIdResponse { + #[prost(oneof = "resource_group_id_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `ResourceGroupIdResponse`. +pub mod resource_group_id_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(uint64, tag = "1")] + ResourceGroupId(u64), + #[prost(message, tag = "2")] + Error(super::ResourceGroupManagementError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VerifyResourceGroupRequest { + #[prost(uint64, tag = "1")] + pub resource_group_id: u64, + #[prost(bytes = "vec", tag = "2")] + pub password: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub session_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RegisterExecutionManagerRequest { + #[prost(string, tag = "1")] + pub ip_address: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ExecutionManagerIdRequest { + #[prost(uint64, tag = "1")] + pub execution_manager_id: u64, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ExecutionManagerRegistration { + #[prost(uint64, tag = "1")] + pub execution_manager_id: u64, + #[prost(uint64, tag = "2")] + pub session_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RegisterExecutionManagerResponse { + #[prost(oneof = "register_execution_manager_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `RegisterExecutionManagerResponse`. +pub mod register_execution_manager_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(message, tag = "1")] + Registration(super::ExecutionManagerRegistration), + #[prost(message, tag = "2")] + Error(super::ExecutionManagerLivenessError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateExecutionManagerHeartbeatResponse { + #[prost( + oneof = "update_execution_manager_heartbeat_response::Result", + tags = "1, 2" + )] + pub result: ::core::option::Option< + update_execution_manager_heartbeat_response::Result, + >, +} +/// Nested message and enum types in `UpdateExecutionManagerHeartbeatResponse`. +pub mod update_execution_manager_heartbeat_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(uint64, tag = "1")] + SessionId(u64), + #[prost(message, tag = "2")] + Error(super::ExecutionManagerLivenessError), + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetSessionResponse { + #[prost(uint64, tag = "1")] + pub session_id: u64, +} #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct TaskId { #[prost(oneof = "task_id::Kind", tags = "1, 2, 3")] @@ -69,33 +319,120 @@ pub mod task_id { } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct StorageOperationResponse { - #[prost(oneof = "storage_operation_response::Result", tags = "1, 2")] - pub result: ::core::option::Option, +pub struct JobManagementOperationResponse { + #[prost(oneof = "job_management_operation_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `JobManagementOperationResponse`. +pub mod job_management_operation_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(message, tag = "1")] + Ok(super::Void), + #[prost(message, tag = "2")] + Error(super::JobOrchestrationError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TaskInstanceOperationResponse { + #[prost(oneof = "task_instance_operation_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `TaskInstanceOperationResponse`. +pub mod task_instance_operation_response { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + #[prost(message, tag = "1")] + Ok(super::Void), + #[prost(message, tag = "2")] + Error(super::TaskInstanceManagementError), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceGroupOperationResponse { + #[prost(oneof = "resource_group_operation_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, } -/// Nested message and enum types in `StorageOperationResponse`. -pub mod storage_operation_response { +/// Nested message and enum types in `ResourceGroupOperationResponse`. +pub mod resource_group_operation_response { #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] Ok(super::Void), #[prost(message, tag = "2")] - Error(super::StorageError), + Error(super::ResourceGroupManagementError), } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Void {} #[derive(Clone, PartialEq, ::prost::Message)] -pub struct StorageError { - #[prost(enumeration = "storage_error::ErrCode", tag = "1")] +pub struct JobOrchestrationError { + #[prost(enumeration = "job_orchestration_error::ErrCode", tag = "1")] + pub err_code: i32, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + #[prost(uint64, tag = "3")] + pub storage_session: u64, +} +/// Nested message and enum types in `JobOrchestrationError`. +pub mod job_orchestration_error { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum ErrCode { + Unspecified = 0, + StaleSession = 1, + Server = 2, + InvalidInput = 3, + JobNotFound = 4, + } + impl ErrCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "ERR_CODE_UNSPECIFIED", + Self::StaleSession => "STALE_SESSION", + Self::Server => "SERVER", + Self::InvalidInput => "INVALID_INPUT", + Self::JobNotFound => "JOB_NOT_FOUND", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "ERR_CODE_UNSPECIFIED" => Some(Self::Unspecified), + "STALE_SESSION" => Some(Self::StaleSession), + "SERVER" => Some(Self::Server), + "INVALID_INPUT" => Some(Self::InvalidInput), + "JOB_NOT_FOUND" => Some(Self::JobNotFound), + _ => None, + } + } + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TaskInstanceManagementError { + #[prost(enumeration = "task_instance_management_error::ErrCode", tag = "1")] pub err_code: i32, #[prost(string, tag = "2")] pub message: ::prost::alloc::string::String, #[prost(uint64, tag = "3")] pub storage_session: u64, } -/// Nested message and enum types in `StorageError`. -pub mod storage_error { +/// Nested message and enum types in `TaskInstanceManagementError`. +pub mod task_instance_management_error { #[derive( Clone, Copy, @@ -112,9 +449,8 @@ pub mod storage_error { Unspecified = 0, StaleSession = 1, CacheStale = 2, - Transport = 3, - Server = 4, - InvalidInput = 5, + Server = 3, + InvalidInput = 4, } impl ErrCode { /// String value of the enum field names used in the ProtoBuf definition. @@ -126,7 +462,6 @@ pub mod storage_error { Self::Unspecified => "ERR_CODE_UNSPECIFIED", Self::StaleSession => "STALE_SESSION", Self::CacheStale => "CACHE_STALE", - Self::Transport => "TRANSPORT", Self::Server => "SERVER", Self::InvalidInput => "INVALID_INPUT", } @@ -137,7 +472,6 @@ pub mod storage_error { "ERR_CODE_UNSPECIFIED" => Some(Self::Unspecified), "STALE_SESSION" => Some(Self::StaleSession), "CACHE_STALE" => Some(Self::CacheStale), - "TRANSPORT" => Some(Self::Transport), "SERVER" => Some(Self::Server), "INVALID_INPUT" => Some(Self::InvalidInput), _ => None, @@ -145,39 +479,241 @@ pub mod storage_error { } } } -/// Generated client implementations. -pub mod task_instance_management_service_client { - #![allow( - unused_variables, - dead_code, - missing_docs, - clippy::wildcard_imports, - clippy::let_unit_value, +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InboundQueueResponseError { + #[prost(enumeration = "inbound_queue_response_error::ErrCode", tag = "1")] + pub err_code: i32, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, +} +/// Nested message and enum types in `InboundQueueResponseError`. +pub mod inbound_queue_response_error { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration )] - use tonic::codegen::*; - use tonic::codegen::http::Uri; - #[derive(Debug, Clone)] - pub struct TaskInstanceManagementServiceClient { - inner: tonic::client::Grpc, + #[repr(i32)] + pub enum ErrCode { + Unspecified = 0, + InboundClosed = 1, + Server = 2, + InvalidInput = 3, } - impl TaskInstanceManagementServiceClient { - /// Attempt to create a new client by connecting to a given endpoint. - pub async fn connect(dst: D) -> Result - where - D: TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) + impl ErrCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "ERR_CODE_UNSPECIFIED", + Self::InboundClosed => "INBOUND_CLOSED", + Self::Server => "SERVER", + Self::InvalidInput => "INVALID_INPUT", + } } - } - impl TaskInstanceManagementServiceClient - where - T: tonic::client::GrpcService, - T::Error: Into, - T::ResponseBody: Body + std::marker::Send + 'static, - ::Error: Into + std::marker::Send, - { + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "ERR_CODE_UNSPECIFIED" => Some(Self::Unspecified), + "INBOUND_CLOSED" => Some(Self::InboundClosed), + "SERVER" => Some(Self::Server), + "INVALID_INPUT" => Some(Self::InvalidInput), + _ => None, + } + } + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceGroupManagementError { + #[prost(enumeration = "resource_group_management_error::ErrCode", tag = "1")] + pub err_code: i32, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + #[prost(uint64, tag = "3")] + pub storage_session: u64, +} +/// Nested message and enum types in `ResourceGroupManagementError`. +pub mod resource_group_management_error { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum ErrCode { + Unspecified = 0, + StaleSession = 1, + Server = 2, + InvalidInput = 3, + } + impl ErrCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "ERR_CODE_UNSPECIFIED", + Self::StaleSession => "STALE_SESSION", + Self::Server => "SERVER", + Self::InvalidInput => "INVALID_INPUT", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "ERR_CODE_UNSPECIFIED" => Some(Self::Unspecified), + "STALE_SESSION" => Some(Self::StaleSession), + "SERVER" => Some(Self::Server), + "INVALID_INPUT" => Some(Self::InvalidInput), + _ => None, + } + } + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExecutionManagerLivenessError { + #[prost(enumeration = "execution_manager_liveness_error::ErrCode", tag = "1")] + pub err_code: i32, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, +} +/// Nested message and enum types in `ExecutionManagerLivenessError`. +pub mod execution_manager_liveness_error { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum ErrCode { + Unspecified = 0, + MarkedDead = 1, + InvalidInput = 2, + Server = 3, + } + impl ErrCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "ERR_CODE_UNSPECIFIED", + Self::MarkedDead => "MARKED_DEAD", + Self::InvalidInput => "INVALID_INPUT", + Self::Server => "SERVER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "ERR_CODE_UNSPECIFIED" => Some(Self::Unspecified), + "MARKED_DEAD" => Some(Self::MarkedDead), + "INVALID_INPUT" => Some(Self::InvalidInput), + "SERVER" => Some(Self::Server), + _ => None, + } + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum JobState { + Unspecified = 0, + Ready = 1, + Running = 2, + CommitReady = 3, + CleanupReady = 4, + Succeeded = 5, + Failed = 6, + Cancelled = 7, +} +impl JobState { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "JOB_STATE_UNSPECIFIED", + Self::Ready => "READY", + Self::Running => "RUNNING", + Self::CommitReady => "COMMIT_READY", + Self::CleanupReady => "CLEANUP_READY", + Self::Succeeded => "SUCCEEDED", + Self::Failed => "FAILED", + Self::Cancelled => "CANCELLED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "JOB_STATE_UNSPECIFIED" => Some(Self::Unspecified), + "READY" => Some(Self::Ready), + "RUNNING" => Some(Self::Running), + "COMMIT_READY" => Some(Self::CommitReady), + "CLEANUP_READY" => Some(Self::CleanupReady), + "SUCCEEDED" => Some(Self::Succeeded), + "FAILED" => Some(Self::Failed), + "CANCELLED" => Some(Self::Cancelled), + _ => None, + } + } +} +/// Generated client implementations. +pub mod job_orchestration_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct JobOrchestrationServiceClient { + inner: tonic::client::Grpc, + } + impl JobOrchestrationServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl JobOrchestrationServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } @@ -189,7 +725,7 @@ pub mod task_instance_management_service_client { pub fn with_interceptor( inner: T, interceptor: F, - ) -> TaskInstanceManagementServiceClient> + ) -> JobOrchestrationServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, @@ -203,7 +739,7 @@ pub mod task_instance_management_service_client { http::Request, >>::Error: Into + std::marker::Send + std::marker::Sync, { - TaskInstanceManagementServiceClient::new( + JobOrchestrationServiceClient::new( InterceptedService::new(inner, interceptor), ) } @@ -238,11 +774,11 @@ pub mod task_instance_management_service_client { self.inner = self.inner.max_encoding_message_size(limit); self } - pub async fn register_task_instance( + pub async fn submit_job( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -255,23 +791,92 @@ pub mod task_instance_management_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/storage.TaskInstanceManagementService/RegisterTaskInstance", + "/storage.JobOrchestrationService/SubmitJob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("storage.JobOrchestrationService", "SubmitJob")); + self.inner.unary(req, path, codec).await + } + pub async fn start_job( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.JobOrchestrationService/StartJob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("storage.JobOrchestrationService", "StartJob")); + self.inner.unary(req, path, codec).await + } + pub async fn cancel_job( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.JobOrchestrationService/CancelJob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("storage.JobOrchestrationService", "CancelJob")); + self.inner.unary(req, path, codec).await + } + pub async fn get_job_state( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.JobOrchestrationService/GetJobState", ); let mut req = request.into_request(); req.extensions_mut() .insert( - GrpcMethod::new( - "storage.TaskInstanceManagementService", - "RegisterTaskInstance", - ), + GrpcMethod::new("storage.JobOrchestrationService", "GetJobState"), ); self.inner.unary(req, path, codec).await } - pub async fn report_task_success( + pub async fn get_job_outputs( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -284,51 +889,2393 @@ pub mod task_instance_management_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/storage.TaskInstanceManagementService/ReportTaskSuccess", + "/storage.JobOrchestrationService/GetJobOutputs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("storage.JobOrchestrationService", "GetJobOutputs"), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_job_error( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.JobOrchestrationService/GetJobError", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("storage.JobOrchestrationService", "GetJobError"), + ); + self.inner.unary(req, path, codec).await + } + pub async fn delete_expired_terminated_jobs( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.JobOrchestrationService/DeleteExpiredTerminatedJobs", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "storage.TaskInstanceManagementService", - "ReportTaskSuccess", + "storage.JobOrchestrationService", + "DeleteExpiredTerminatedJobs", ), ); self.inner.unary(req, path, codec).await } - pub async fn report_task_failure( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/storage.TaskInstanceManagementService/ReportTaskFailure", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "storage.TaskInstanceManagementService", - "ReportTaskFailure", - ), - ); - self.inner.unary(req, path, codec).await + pub async fn resend_ready_tasks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.JobOrchestrationService/ResendReadyTasks", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.JobOrchestrationService", + "ResendReadyTasks", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod task_instance_management_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct TaskInstanceManagementServiceClient { + inner: tonic::client::Grpc, + } + impl TaskInstanceManagementServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl TaskInstanceManagementServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> TaskInstanceManagementServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + TaskInstanceManagementServiceClient::new( + InterceptedService::new(inner, interceptor), + ) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn register_task_instance( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.TaskInstanceManagementService/RegisterTaskInstance", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.TaskInstanceManagementService", + "RegisterTaskInstance", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn report_task_success( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.TaskInstanceManagementService/ReportTaskSuccess", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.TaskInstanceManagementService", + "ReportTaskSuccess", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn report_task_failure( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.TaskInstanceManagementService/ReportTaskFailure", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.TaskInstanceManagementService", + "ReportTaskFailure", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod inbound_queue_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct InboundQueueServiceClient { + inner: tonic::client::Grpc, + } + impl InboundQueueServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl InboundQueueServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InboundQueueServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + InboundQueueServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn poll_ready_tasks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.InboundQueueService/PollReadyTasks", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("storage.InboundQueueService", "PollReadyTasks"), + ); + self.inner.unary(req, path, codec).await + } + pub async fn poll_ready_commit_tasks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.InboundQueueService/PollReadyCommitTasks", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.InboundQueueService", + "PollReadyCommitTasks", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn poll_ready_cleanup_tasks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.InboundQueueService/PollReadyCleanupTasks", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.InboundQueueService", + "PollReadyCleanupTasks", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod resource_group_management_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct ResourceGroupManagementServiceClient { + inner: tonic::client::Grpc, + } + impl ResourceGroupManagementServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl ResourceGroupManagementServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> ResourceGroupManagementServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + ResourceGroupManagementServiceClient::new( + InterceptedService::new(inner, interceptor), + ) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn add_resource_group( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.ResourceGroupManagementService/AddResourceGroup", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.ResourceGroupManagementService", + "AddResourceGroup", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn verify_resource_group( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.ResourceGroupManagementService/VerifyResourceGroup", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.ResourceGroupManagementService", + "VerifyResourceGroup", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn delete_resource_group( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.ResourceGroupManagementService/DeleteResourceGroup", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.ResourceGroupManagementService", + "DeleteResourceGroup", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod execution_manager_liveness_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct ExecutionManagerLivenessServiceClient { + inner: tonic::client::Grpc, + } + impl ExecutionManagerLivenessServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl ExecutionManagerLivenessServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> ExecutionManagerLivenessServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + ExecutionManagerLivenessServiceClient::new( + InterceptedService::new(inner, interceptor), + ) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn register_execution_manager( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.ExecutionManagerLivenessService/RegisterExecutionManager", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.ExecutionManagerLivenessService", + "RegisterExecutionManager", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn update_execution_manager_heartbeat( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.ExecutionManagerLivenessService/UpdateExecutionManagerHeartbeat", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "storage.ExecutionManagerLivenessService", + "UpdateExecutionManagerHeartbeat", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod session_management_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SessionManagementServiceClient { + inner: tonic::client::Grpc, + } + impl SessionManagementServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SessionManagementServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SessionManagementServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SessionManagementServiceClient::new( + InterceptedService::new(inner, interceptor), + ) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn get_session( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/storage.SessionManagementService/GetSession", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("storage.SessionManagementService", "GetSession"), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod job_orchestration_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with JobOrchestrationServiceServer. + #[async_trait] + pub trait JobOrchestrationService: std::marker::Send + std::marker::Sync + 'static { + async fn submit_job( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn start_job( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn cancel_job( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_job_state( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_job_outputs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_job_error( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn delete_expired_terminated_jobs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn resend_ready_tasks( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct JobOrchestrationServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl JobOrchestrationServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> + for JobOrchestrationServiceServer + where + T: JobOrchestrationService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/storage.JobOrchestrationService/SubmitJob" => { + #[allow(non_camel_case_types)] + struct SubmitJobSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for SubmitJobSvc { + type Response = super::SubmitJobResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::submit_job(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SubmitJobSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/StartJob" => { + #[allow(non_camel_case_types)] + struct StartJobSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for StartJobSvc { + type Response = super::JobStateResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::start_job(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = StartJobSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/CancelJob" => { + #[allow(non_camel_case_types)] + struct CancelJobSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for CancelJobSvc { + type Response = super::JobStateResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::cancel_job(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CancelJobSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/GetJobState" => { + #[allow(non_camel_case_types)] + struct GetJobStateSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for GetJobStateSvc { + type Response = super::JobStateResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_job_state( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetJobStateSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/GetJobOutputs" => { + #[allow(non_camel_case_types)] + struct GetJobOutputsSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for GetJobOutputsSvc { + type Response = super::JobOutputsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_job_outputs( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetJobOutputsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/GetJobError" => { + #[allow(non_camel_case_types)] + struct GetJobErrorSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for GetJobErrorSvc { + type Response = super::JobErrorResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_job_error( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetJobErrorSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/DeleteExpiredTerminatedJobs" => { + #[allow(non_camel_case_types)] + struct DeleteExpiredTerminatedJobsSvc( + pub Arc, + ); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService< + super::DeleteExpiredTerminatedJobsRequest, + > for DeleteExpiredTerminatedJobsSvc { + type Response = super::DeleteExpiredTerminatedJobsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::DeleteExpiredTerminatedJobsRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::delete_expired_terminated_jobs( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteExpiredTerminatedJobsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.JobOrchestrationService/ResendReadyTasks" => { + #[allow(non_camel_case_types)] + struct ResendReadyTasksSvc(pub Arc); + impl< + T: JobOrchestrationService, + > tonic::server::UnaryService + for ResendReadyTasksSvc { + type Response = super::JobManagementOperationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::resend_ready_tasks( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ResendReadyTasksSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for JobOrchestrationServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "storage.JobOrchestrationService"; + impl tonic::server::NamedService for JobOrchestrationServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} +/// Generated server implementations. +pub mod task_instance_management_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with TaskInstanceManagementServiceServer. + #[async_trait] + pub trait TaskInstanceManagementService: std::marker::Send + std::marker::Sync + 'static { + async fn register_task_instance( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn report_task_success( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn report_task_failure( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct TaskInstanceManagementServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl TaskInstanceManagementServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> + for TaskInstanceManagementServiceServer + where + T: TaskInstanceManagementService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/storage.TaskInstanceManagementService/RegisterTaskInstance" => { + #[allow(non_camel_case_types)] + struct RegisterTaskInstanceSvc( + pub Arc, + ); + impl< + T: TaskInstanceManagementService, + > tonic::server::UnaryService + for RegisterTaskInstanceSvc { + type Response = super::RegisterTaskInstanceResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::register_task_instance( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = RegisterTaskInstanceSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.TaskInstanceManagementService/ReportTaskSuccess" => { + #[allow(non_camel_case_types)] + struct ReportTaskSuccessSvc( + pub Arc, + ); + impl< + T: TaskInstanceManagementService, + > tonic::server::UnaryService + for ReportTaskSuccessSvc { + type Response = super::TaskInstanceOperationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::report_task_success( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReportTaskSuccessSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.TaskInstanceManagementService/ReportTaskFailure" => { + #[allow(non_camel_case_types)] + struct ReportTaskFailureSvc( + pub Arc, + ); + impl< + T: TaskInstanceManagementService, + > tonic::server::UnaryService + for ReportTaskFailureSvc { + type Response = super::TaskInstanceOperationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::report_task_failure( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReportTaskFailureSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for TaskInstanceManagementServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "storage.TaskInstanceManagementService"; + impl tonic::server::NamedService for TaskInstanceManagementServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} +/// Generated server implementations. +pub mod inbound_queue_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with InboundQueueServiceServer. + #[async_trait] + pub trait InboundQueueService: std::marker::Send + std::marker::Sync + 'static { + async fn poll_ready_tasks( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn poll_ready_commit_tasks( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn poll_ready_cleanup_tasks( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct InboundQueueServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl InboundQueueServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for InboundQueueServiceServer + where + T: InboundQueueService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/storage.InboundQueueService/PollReadyTasks" => { + #[allow(non_camel_case_types)] + struct PollReadyTasksSvc(pub Arc); + impl< + T: InboundQueueService, + > tonic::server::UnaryService + for PollReadyTasksSvc { + type Response = super::PollReadyTasksResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::poll_ready_tasks( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PollReadyTasksSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.InboundQueueService/PollReadyCommitTasks" => { + #[allow(non_camel_case_types)] + struct PollReadyCommitTasksSvc(pub Arc); + impl< + T: InboundQueueService, + > tonic::server::UnaryService + for PollReadyCommitTasksSvc { + type Response = super::PollReadyTasksResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::poll_ready_commit_tasks( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PollReadyCommitTasksSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.InboundQueueService/PollReadyCleanupTasks" => { + #[allow(non_camel_case_types)] + struct PollReadyCleanupTasksSvc(pub Arc); + impl< + T: InboundQueueService, + > tonic::server::UnaryService + for PollReadyCleanupTasksSvc { + type Response = super::PollReadyTasksResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::poll_ready_cleanup_tasks( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PollReadyCleanupTasksSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for InboundQueueServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "storage.InboundQueueService"; + impl tonic::server::NamedService for InboundQueueServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} +/// Generated server implementations. +pub mod resource_group_management_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with ResourceGroupManagementServiceServer. + #[async_trait] + pub trait ResourceGroupManagementService: std::marker::Send + std::marker::Sync + 'static { + async fn add_resource_group( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn verify_resource_group( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn delete_resource_group( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct ResourceGroupManagementServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl ResourceGroupManagementServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> + for ResourceGroupManagementServiceServer + where + T: ResourceGroupManagementService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/storage.ResourceGroupManagementService/AddResourceGroup" => { + #[allow(non_camel_case_types)] + struct AddResourceGroupSvc( + pub Arc, + ); + impl< + T: ResourceGroupManagementService, + > tonic::server::UnaryService + for AddResourceGroupSvc { + type Response = super::ResourceGroupIdResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::add_resource_group( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = AddResourceGroupSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.ResourceGroupManagementService/VerifyResourceGroup" => { + #[allow(non_camel_case_types)] + struct VerifyResourceGroupSvc( + pub Arc, + ); + impl< + T: ResourceGroupManagementService, + > tonic::server::UnaryService + for VerifyResourceGroupSvc { + type Response = super::ResourceGroupOperationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::verify_resource_group( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = VerifyResourceGroupSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/storage.ResourceGroupManagementService/DeleteResourceGroup" => { + #[allow(non_camel_case_types)] + struct DeleteResourceGroupSvc( + pub Arc, + ); + impl< + T: ResourceGroupManagementService, + > tonic::server::UnaryService + for DeleteResourceGroupSvc { + type Response = super::ResourceGroupOperationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::delete_resource_group( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteResourceGroupSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for ResourceGroupManagementServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } } } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "storage.ResourceGroupManagementService"; + impl tonic::server::NamedService for ResourceGroupManagementServiceServer { + const NAME: &'static str = SERVICE_NAME; + } } /// Generated server implementations. -pub mod task_instance_management_service_server { +pub mod execution_manager_liveness_service_server { #![allow( unused_variables, dead_code, @@ -337,40 +3284,33 @@ pub mod task_instance_management_service_server { clippy::let_unit_value, )] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with TaskInstanceManagementServiceServer. + /// Generated trait containing gRPC methods that should be implemented for use with ExecutionManagerLivenessServiceServer. #[async_trait] - pub trait TaskInstanceManagementService: std::marker::Send + std::marker::Sync + 'static { - async fn register_task_instance( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; - async fn report_task_success( + pub trait ExecutionManagerLivenessService: std::marker::Send + std::marker::Sync + 'static { + async fn register_execution_manager( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; - async fn report_task_failure( + async fn update_execution_manager_heartbeat( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; } #[derive(Debug)] - pub struct TaskInstanceManagementServiceServer { + pub struct ExecutionManagerLivenessServiceServer { inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - impl TaskInstanceManagementServiceServer { + impl ExecutionManagerLivenessServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -422,9 +3362,9 @@ pub mod task_instance_management_service_server { } } impl tonic::codegen::Service> - for TaskInstanceManagementServiceServer + for ExecutionManagerLivenessServiceServer where - T: TaskInstanceManagementService, + T: ExecutionManagerLivenessService, B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { @@ -439,27 +3379,31 @@ pub mod task_instance_management_service_server { } fn call(&mut self, req: http::Request) -> Self::Future { match req.uri().path() { - "/storage.TaskInstanceManagementService/RegisterTaskInstance" => { + "/storage.ExecutionManagerLivenessService/RegisterExecutionManager" => { #[allow(non_camel_case_types)] - struct RegisterTaskInstanceSvc( + struct RegisterExecutionManagerSvc< + T: ExecutionManagerLivenessService, + >( pub Arc, ); impl< - T: TaskInstanceManagementService, - > tonic::server::UnaryService - for RegisterTaskInstanceSvc { - type Response = super::RegisterTaskInstanceResponse; + T: ExecutionManagerLivenessService, + > tonic::server::UnaryService + for RegisterExecutionManagerSvc { + type Response = super::RegisterExecutionManagerResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request< + super::RegisterExecutionManagerRequest, + >, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::register_task_instance( + ::register_execution_manager( &inner, request, ) @@ -474,7 +3418,7 @@ pub mod task_instance_management_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = RegisterTaskInstanceSvc(inner); + let method = RegisterExecutionManagerSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -490,27 +3434,29 @@ pub mod task_instance_management_service_server { }; Box::pin(fut) } - "/storage.TaskInstanceManagementService/ReportTaskSuccess" => { + "/storage.ExecutionManagerLivenessService/UpdateExecutionManagerHeartbeat" => { #[allow(non_camel_case_types)] - struct ReportTaskSuccessSvc( + struct UpdateExecutionManagerHeartbeatSvc< + T: ExecutionManagerLivenessService, + >( pub Arc, ); impl< - T: TaskInstanceManagementService, - > tonic::server::UnaryService - for ReportTaskSuccessSvc { - type Response = super::StorageOperationResponse; + T: ExecutionManagerLivenessService, + > tonic::server::UnaryService + for UpdateExecutionManagerHeartbeatSvc { + type Response = super::UpdateExecutionManagerHeartbeatResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::report_task_success( + ::update_execution_manager_heartbeat( &inner, request, ) @@ -525,7 +3471,7 @@ pub mod task_instance_management_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = ReportTaskSuccessSvc(inner); + let method = UpdateExecutionManagerHeartbeatSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -541,27 +3487,160 @@ pub mod task_instance_management_service_server { }; Box::pin(fut) } - "/storage.TaskInstanceManagementService/ReportTaskFailure" => { + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for ExecutionManagerLivenessServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "storage.ExecutionManagerLivenessService"; + impl tonic::server::NamedService for ExecutionManagerLivenessServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} +/// Generated server implementations. +pub mod session_management_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SessionManagementServiceServer. + #[async_trait] + pub trait SessionManagementService: std::marker::Send + std::marker::Sync + 'static { + async fn get_session( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct SessionManagementServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SessionManagementServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> + for SessionManagementServiceServer + where + T: SessionManagementService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/storage.SessionManagementService/GetSession" => { #[allow(non_camel_case_types)] - struct ReportTaskFailureSvc( - pub Arc, - ); + struct GetSessionSvc(pub Arc); impl< - T: TaskInstanceManagementService, - > tonic::server::UnaryService - for ReportTaskFailureSvc { - type Response = super::StorageOperationResponse; + T: SessionManagementService, + > tonic::server::UnaryService for GetSessionSvc { + type Response = super::GetSessionResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::report_task_failure( + ::get_session( &inner, request, ) @@ -576,7 +3655,7 @@ pub mod task_instance_management_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = ReportTaskFailureSvc(inner); + let method = GetSessionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -612,7 +3691,7 @@ pub mod task_instance_management_service_server { } } } - impl Clone for TaskInstanceManagementServiceServer { + impl Clone for SessionManagementServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -625,8 +3704,8 @@ pub mod task_instance_management_service_server { } } /// Generated gRPC service name - pub const SERVICE_NAME: &str = "storage.TaskInstanceManagementService"; - impl tonic::server::NamedService for TaskInstanceManagementServiceServer { + pub const SERVICE_NAME: &str = "storage.SessionManagementService"; + impl tonic::server::NamedService for SessionManagementServiceServer { const NAME: &'static str = SERVICE_NAME; } } diff --git a/components/spider-proto-rust/src/id.rs b/components/spider-proto-rust/src/id.rs index ef21bcd8..bd21a55a 100644 --- a/components/spider-proto-rust/src/id.rs +++ b/components/spider-proto-rust/src/id.rs @@ -2,7 +2,10 @@ use spider_core::types::id::TaskId; -use crate::storage::{self, task_id}; +use crate::{ + error::Error, + storage::{self, task_id}, +}; impl From for storage::TaskId { fn from(task_id: TaskId) -> Self { @@ -17,6 +20,21 @@ impl From for storage::TaskId { } } +impl TryFrom for TaskId { + type Error = Error; + + fn try_from(task_id: storage::TaskId) -> Result { + match task_id.kind { + Some(task_id::Kind::Index(task_index)) => Ok(Self::Index( + usize::try_from(task_index).map_err(|_| Error::TaskIndexOutOfRange(task_index))?, + )), + Some(task_id::Kind::Commit(_)) => Ok(Self::Commit), + Some(task_id::Kind::Cleanup(_)) => Ok(Self::Cleanup), + None => Err(Error::TaskIdKindMissing), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -41,4 +59,22 @@ mod tests { assert!(matches!(task_id.kind, Some(task_id::Kind::Cleanup(_)))); } + + #[test] + fn protocol_task_id_to_core_converts_index_task() { + let task_id = TaskId::try_from(storage::TaskId { + kind: Some(task_id::Kind::Index(7)), + }) + .expect("protocol task id conversion should succeed"); + + assert_eq!(task_id, TaskId::Index(7)); + } + + #[test] + fn protocol_task_id_to_core_rejects_missing_kind() { + let error = TaskId::try_from(storage::TaskId { kind: None }) + .expect_err("missing task id kind should fail"); + + assert!(matches!(error, Error::TaskIdKindMissing)); + } } diff --git a/components/spider-proto-rust/src/job.rs b/components/spider-proto-rust/src/job.rs new file mode 100644 index 00000000..93d167cb --- /dev/null +++ b/components/spider-proto-rust/src/job.rs @@ -0,0 +1,66 @@ +//! Helpers for converting Spider job values to protobuf fields. + +use spider_core::job::JobState; + +use crate::{error::Error, storage}; + +impl From for storage::JobState { + fn from(state: JobState) -> Self { + match state { + JobState::Ready => Self::Ready, + JobState::Running => Self::Running, + JobState::CommitReady => Self::CommitReady, + JobState::CleanupReady => Self::CleanupReady, + JobState::Succeeded => Self::Succeeded, + JobState::Failed => Self::Failed, + JobState::Cancelled => Self::Cancelled, + } + } +} + +impl TryFrom for JobState { + type Error = Error; + + fn try_from(state: storage::JobState) -> Result { + match state { + storage::JobState::Unspecified => Err(Error::JobStateUnspecified), + storage::JobState::Ready => Ok(Self::Ready), + storage::JobState::Running => Ok(Self::Running), + storage::JobState::CommitReady => Ok(Self::CommitReady), + storage::JobState::CleanupReady => Ok(Self::CleanupReady), + storage::JobState::Succeeded => Ok(Self::Succeeded), + storage::JobState::Failed => Ok(Self::Failed), + storage::JobState::Cancelled => Ok(Self::Cancelled), + } + } +} + +#[cfg(test)] +mod tests { + use spider_core::job::JobState; + + use crate::{error::Error, storage::JobState as ProtocolJobState}; + + #[test] + fn job_state_to_protocol_converts_succeeded() { + let protocol_state = ProtocolJobState::from(JobState::Succeeded); + + assert_eq!(protocol_state, ProtocolJobState::Succeeded); + } + + #[test] + fn protocol_job_state_to_core_converts_cleanup_ready() { + let state = JobState::try_from(ProtocolJobState::CleanupReady) + .expect("protocol job state conversion should succeed"); + + assert_eq!(state, JobState::CleanupReady); + } + + #[test] + fn protocol_job_state_to_core_rejects_unspecified() { + let error = JobState::try_from(ProtocolJobState::Unspecified) + .expect_err("unspecified job state should fail"); + + assert!(matches!(error, Error::JobStateUnspecified)); + } +} diff --git a/components/spider-proto-rust/src/lib.rs b/components/spider-proto-rust/src/lib.rs index d78e8f0d..9dcff1c9 100644 --- a/components/spider-proto-rust/src/lib.rs +++ b/components/spider-proto-rust/src/lib.rs @@ -1,6 +1,8 @@ //! Rust gRPC protocol definitions generated from Spider protobuf files. +pub mod error; pub mod id; +pub mod job; #[allow(clippy::all, clippy::nursery, clippy::pedantic)] pub mod storage { diff --git a/components/spider-proto/storage/storage.proto b/components/spider-proto/storage/storage.proto index 3d6f5483..df771fb7 100644 --- a/components/spider-proto/storage/storage.proto +++ b/components/spider-proto/storage/storage.proto @@ -2,10 +2,132 @@ syntax = "proto3"; package storage; +service JobOrchestrationService { + rpc SubmitJob(SubmitJobRequest) returns (SubmitJobResponse); + rpc StartJob(JobIdRequest) returns (JobStateResponse); + rpc CancelJob(JobIdRequest) returns (JobStateResponse); + rpc GetJobState(JobIdRequest) returns (JobStateResponse); + rpc GetJobOutputs(JobIdRequest) returns (JobOutputsResponse); + rpc GetJobError(JobIdRequest) returns (JobErrorResponse); + rpc DeleteExpiredTerminatedJobs(DeleteExpiredTerminatedJobsRequest) + returns (DeleteExpiredTerminatedJobsResponse); + rpc ResendReadyTasks(ResendReadyTasksRequest) returns (JobManagementOperationResponse); +} + service TaskInstanceManagementService { rpc RegisterTaskInstance(RegisterTaskInstanceRequest) returns (RegisterTaskInstanceResponse); - rpc ReportTaskSuccess(ReportTaskSuccessRequest) returns (StorageOperationResponse); - rpc ReportTaskFailure(ReportTaskFailureRequest) returns (StorageOperationResponse); + rpc ReportTaskSuccess(ReportTaskSuccessRequest) returns (TaskInstanceOperationResponse); + rpc ReportTaskFailure(ReportTaskFailureRequest) returns (TaskInstanceOperationResponse); +} + +service InboundQueueService { + rpc PollReadyTasks(PollReadyTasksRequest) returns (PollReadyTasksResponse); + rpc PollReadyCommitTasks(PollReadyTasksRequest) returns (PollReadyTasksResponse); + rpc PollReadyCleanupTasks(PollReadyTasksRequest) returns (PollReadyTasksResponse); +} + +service ResourceGroupManagementService { + rpc AddResourceGroup(AddResourceGroupRequest) returns (ResourceGroupIdResponse); + rpc VerifyResourceGroup(VerifyResourceGroupRequest) returns (ResourceGroupOperationResponse); + rpc DeleteResourceGroup(ResourceGroupIdRequest) returns (ResourceGroupOperationResponse); +} + +service ExecutionManagerLivenessService { + rpc RegisterExecutionManager(RegisterExecutionManagerRequest) + returns (RegisterExecutionManagerResponse); + rpc UpdateExecutionManagerHeartbeat(ExecutionManagerIdRequest) + returns (UpdateExecutionManagerHeartbeatResponse); +} + +service SessionManagementService { + rpc GetSession(Void) returns (GetSessionResponse); +} + +message SubmitJobRequest { + uint64 resource_group_id = 1; + bytes serialized_task_graph = 2; + bytes serialized_inputs = 3; + uint64 session_id = 4; +} + +message SubmitJobResponse { + oneof result { + uint64 job_id = 1; + JobOrchestrationError error = 2; + } +} + +message JobIdRequest { + uint64 job_id = 1; +} + +message JobStateResponse { + oneof result { + JobState state = 1; + JobOrchestrationError error = 2; + } +} + +message JobOutputsResponse { + oneof result { + JobOutputs outputs = 1; + JobOrchestrationError error = 2; + } +} + +message JobOutputs { + repeated bytes outputs = 1; +} + +message JobErrorResponse { + oneof result { + string error_message = 1; + JobOrchestrationError error = 2; + } +} + +message DeleteExpiredTerminatedJobsRequest { + uint64 expire_after_sec = 1; + uint64 session_id = 2; +} + +message DeleteExpiredTerminatedJobsResponse { + oneof result { + DeletedJobs deleted_jobs = 1; + JobOrchestrationError error = 2; + } +} + +message DeletedJobs { + repeated uint64 job_ids = 1; +} + +message ResendReadyTasksRequest { + optional uint64 job_id = 1; + uint64 session_id = 2; +} + +message PollReadyTasksRequest { + uint64 max_items = 1; + uint64 wait_ms = 2; +} + +message PollReadyTasksResponse { + oneof result { + ReadyTasks tasks = 1; + InboundQueueResponseError error = 2; + } +} + +message ReadyTasks { + uint64 session_id = 1; + repeated ReadyTask tasks = 2; +} + +message ReadyTask { + uint64 resource_group_id = 1; + uint64 job_id = 2; + TaskId task_id = 3; } message RegisterTaskInstanceRequest { @@ -18,7 +140,7 @@ message RegisterTaskInstanceRequest { message RegisterTaskInstanceResponse { oneof result { bytes execution_context = 1; - StorageError error = 2; + TaskInstanceManagementError error = 2; } } @@ -38,6 +160,61 @@ message ReportTaskFailureRequest { string error_message = 5; } +message AddResourceGroupRequest { + string external_resource_group_id = 1; + bytes password = 2; + uint64 session_id = 3; +} + +message ResourceGroupIdRequest { + uint64 resource_group_id = 1; + uint64 session_id = 2; +} + +message ResourceGroupIdResponse { + oneof result { + uint64 resource_group_id = 1; + ResourceGroupManagementError error = 2; + } +} + +message VerifyResourceGroupRequest { + uint64 resource_group_id = 1; + bytes password = 2; + uint64 session_id = 3; +} + +message RegisterExecutionManagerRequest { + string ip_address = 1; +} + +message ExecutionManagerIdRequest { + uint64 execution_manager_id = 1; +} + +message ExecutionManagerRegistration { + uint64 execution_manager_id = 1; + uint64 session_id = 2; +} + +message RegisterExecutionManagerResponse { + oneof result { + ExecutionManagerRegistration registration = 1; + ExecutionManagerLivenessError error = 2; + } +} + +message UpdateExecutionManagerHeartbeatResponse { + oneof result { + uint64 session_id = 1; + ExecutionManagerLivenessError error = 2; + } +} + +message GetSessionResponse { + uint64 session_id = 1; +} + message TaskId { oneof kind { uint64 index = 1; @@ -46,26 +223,101 @@ message TaskId { } } -message StorageOperationResponse { +enum JobState { + JOB_STATE_UNSPECIFIED = 0; + READY = 1; + RUNNING = 2; + COMMIT_READY = 3; + CLEANUP_READY = 4; + SUCCEEDED = 5; + FAILED = 6; + CANCELLED = 7; +} + +message JobManagementOperationResponse { + oneof result { + Void ok = 1; + JobOrchestrationError error = 2; + } +} + +message TaskInstanceOperationResponse { + oneof result { + Void ok = 1; + TaskInstanceManagementError error = 2; + } +} + +message ResourceGroupOperationResponse { oneof result { Void ok = 1; - StorageError error = 2; + ResourceGroupManagementError error = 2; } } message Void {} -message StorageError { +message JobOrchestrationError { + enum ErrCode { + ERR_CODE_UNSPECIFIED = 0; + STALE_SESSION = 1; + SERVER = 2; + INVALID_INPUT = 3; + JOB_NOT_FOUND = 4; + } + + ErrCode err_code = 1; + string message = 2; + uint64 storage_session = 3; +} + +message TaskInstanceManagementError { enum ErrCode { ERR_CODE_UNSPECIFIED = 0; STALE_SESSION = 1; CACHE_STALE = 2; - TRANSPORT = 3; - SERVER = 4; - INVALID_INPUT = 5; + SERVER = 3; + INVALID_INPUT = 4; + } + + ErrCode err_code = 1; + string message = 2; + uint64 storage_session = 3; +} + +message InboundQueueResponseError { + enum ErrCode { + ERR_CODE_UNSPECIFIED = 0; + INBOUND_CLOSED = 1; + SERVER = 2; + INVALID_INPUT = 3; + } + + ErrCode err_code = 1; + string message = 2; +} + +message ResourceGroupManagementError { + enum ErrCode { + ERR_CODE_UNSPECIFIED = 0; + STALE_SESSION = 1; + SERVER = 2; + INVALID_INPUT = 3; } ErrCode err_code = 1; string message = 2; uint64 storage_session = 3; } + +message ExecutionManagerLivenessError { + enum ErrCode { + ERR_CODE_UNSPECIFIED = 0; + MARKED_DEAD = 1; + INVALID_INPUT = 2; + SERVER = 3; + } + + ErrCode err_code = 1; + string message = 2; +} diff --git a/components/spider-scheduler/Cargo.toml b/components/spider-scheduler/Cargo.toml index ee803e17..3254c747 100644 --- a/components/spider-scheduler/Cargo.toml +++ b/components/spider-scheduler/Cargo.toml @@ -11,9 +11,11 @@ path = "src/lib.rs" async-channel = "2.3.1" async-trait = "0.1.89" spider-core = { path = "../spider-core" } +spider-proto-rust = { path = "../spider-proto-rust" } thiserror = "2.0.18" tokio = { version = "1.52.3", features = ["sync", "time"] } tokio-util = "0.7.18" +tonic = "0.12.3" [dev-dependencies] anyhow = "1.0.102" diff --git a/components/spider-scheduler/src/error.rs b/components/spider-scheduler/src/error.rs index 6a852c46..18519da2 100644 --- a/components/spider-scheduler/src/error.rs +++ b/components/spider-scheduler/src/error.rs @@ -12,6 +12,25 @@ pub enum StorageClientError { /// No job with the requested identifier exists. #[error("job not found: {0:?}")] JobNotFound(JobId), + + /// The scheduler's storage session is stale. + #[error("stale storage session: {storage_session:?}")] + StaleSession { + /// Storage's current session ID. + storage_session: SessionId, + }, + + /// The storage server returned an invalid input error. + #[error("invalid storage request: {0}")] + InvalidInput(String), + + /// The storage server returned an otherwise-uncategorized error. + #[error("storage server error: {0}")] + Server(String), + + /// The storage transport failed or returned malformed data. + #[error("storage transport error: {0}")] + Transport(String), } /// Errors returned by the scheduler runtime and its components. diff --git a/components/spider-scheduler/src/lib.rs b/components/spider-scheduler/src/lib.rs index bddd0750..9a16cd97 100644 --- a/components/spider-scheduler/src/lib.rs +++ b/components/spider-scheduler/src/lib.rs @@ -41,6 +41,6 @@ pub use crate::{ core::SchedulerCore, dispatch_queue::{DispatchQueueSink, DispatchQueueSource}, error::{SchedulerError, StorageClientError}, - storage_client::SchedulerStorageClient, + storage_client::{GrpcSchedulerStorageClient, SchedulerStorageClient}, types::{InboundEntry, TaskAssignment}, }; diff --git a/components/spider-scheduler/src/storage_client/grpc.rs b/components/spider-scheduler/src/storage_client/grpc.rs new file mode 100644 index 00000000..4ee3f464 --- /dev/null +++ b/components/spider-scheduler/src/storage_client/grpc.rs @@ -0,0 +1,366 @@ +//! gRPC-backed [`SchedulerStorageClient`] implementation. + +use std::time::Duration; + +use async_trait::async_trait; +use spider_core::{ + job::JobState, + types::id::{JobId, ResourceGroupId, SessionId, TaskId}, +}; +use spider_proto_rust::storage::{ + self, + inbound_queue_response_error, + inbound_queue_service_client::InboundQueueServiceClient, + job_orchestration_error, + job_orchestration_service_client::JobOrchestrationServiceClient, + job_state_response, + poll_ready_tasks_response, +}; +use tonic::transport::{Channel, Endpoint}; + +use crate::{ + error::StorageClientError, + storage_client::SchedulerStorageClient, + types::InboundEntry, +}; + +/// gRPC-backed [`SchedulerStorageClient`] implementation. +#[derive(Debug, Clone)] +pub struct GrpcSchedulerStorageClient { + scheduler_client: InboundQueueServiceClient, + job_client: JobOrchestrationServiceClient, +} + +impl GrpcSchedulerStorageClient { + /// Connects to the storage gRPC endpoint. + /// + /// # Returns + /// + /// A new [`GrpcSchedulerStorageClient`] connected to `endpoint` on success. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`StorageClientError::Transport`] if tonic cannot create or connect to the endpoint. + pub async fn connect(endpoint: Endpoint) -> Result { + let channel = endpoint.connect().await.map_err(to_transport_error)?; + + Ok(Self { + scheduler_client: InboundQueueServiceClient::new(channel.clone()), + job_client: JobOrchestrationServiceClient::new(channel), + }) + } +} + +#[async_trait] +impl SchedulerStorageClient for GrpcSchedulerStorageClient { + async fn poll_ready( + &self, + max_items: usize, + wait: Duration, + ) -> Result<(SessionId, Vec), StorageClientError> { + let request = poll_ready_tasks_request(max_items, wait)?; + let response = self + .scheduler_client + .clone() + .poll_ready_tasks(request) + .await + .map_err(to_transport_error)? + .into_inner(); + poll_ready_tasks_response_to_result(response) + } + + async fn poll_commit_ready( + &self, + max_items: usize, + wait: Duration, + ) -> Result<(SessionId, Vec), StorageClientError> { + let request = poll_ready_tasks_request(max_items, wait)?; + let response = self + .scheduler_client + .clone() + .poll_ready_commit_tasks(request) + .await + .map_err(to_transport_error)? + .into_inner(); + poll_ready_tasks_response_to_result(response) + } + + async fn poll_cleanup_ready( + &self, + max_items: usize, + wait: Duration, + ) -> Result<(SessionId, Vec), StorageClientError> { + let request = poll_ready_tasks_request(max_items, wait)?; + let response = self + .scheduler_client + .clone() + .poll_ready_cleanup_tasks(request) + .await + .map_err(to_transport_error)? + .into_inner(); + poll_ready_tasks_response_to_result(response) + } + + async fn job_state(&self, job_id: JobId) -> Result { + let request = storage::JobIdRequest { + job_id: job_id.get(), + }; + let response = self + .job_client + .clone() + .get_job_state(request) + .await + .map_err(to_transport_error)? + .into_inner(); + job_state_response_to_result(response, job_id) + } +} + +impl From for StorageClientError { + fn from(error: storage::InboundQueueResponseError) -> Self { + match inbound_queue_response_error::ErrCode::try_from(error.err_code) { + Ok(inbound_queue_response_error::ErrCode::InboundClosed) => Self::InboundClosed, + Ok(inbound_queue_response_error::ErrCode::InvalidInput) => { + Self::InvalidInput(error.message) + } + Ok( + inbound_queue_response_error::ErrCode::Server + | inbound_queue_response_error::ErrCode::Unspecified, + ) => Self::Server(error.message), + Err(error) => Self::Transport(format!("unknown scheduler storage error kind: {error}")), + } + } +} + +/// # Returns +/// +/// A [`storage::PollReadyTasksRequest`] carrying `max_items` and `wait` on success. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * [`StorageClientError::InvalidInput`] if either value cannot fit in the protobuf field type. +fn poll_ready_tasks_request( + max_items: usize, + wait: Duration, +) -> Result { + Ok(storage::PollReadyTasksRequest { + max_items: u64::try_from(max_items).map_err(to_invalid_input_error)?, + wait_ms: u64::try_from(wait.as_millis()).map_err(to_invalid_input_error)?, + }) +} + +/// # Returns +/// +/// [`storage::PollReadyTasksResponse`] converted into +/// [`Result<(SessionId, Vec), StorageClientError>`]. +fn poll_ready_tasks_response_to_result( + response: storage::PollReadyTasksResponse, +) -> Result<(SessionId, Vec), StorageClientError> { + match response.result { + Some(poll_ready_tasks_response::Result::Tasks(tasks)) => ready_tasks_to_result(tasks), + Some(poll_ready_tasks_response::Result::Error(error)) => Err(error.into()), + None => Err(StorageClientError::Transport( + "poll ready tasks response missing `result` message".to_owned(), + )), + } +} + +/// # Returns +/// +/// [`storage::ReadyTasks`] converted into +/// [`Result<(SessionId, Vec), StorageClientError>`]. +fn ready_tasks_to_result( + tasks: storage::ReadyTasks, +) -> Result<(SessionId, Vec), StorageClientError> { + let session_id = tasks.session_id; + let entries = tasks + .tasks + .into_iter() + .map(ready_task_to_inbound_entry) + .collect::, _>>()?; + Ok((session_id, entries)) +} + +/// # Returns +/// +/// `task` converted into an [`InboundEntry`] on success. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * [`StorageClientError::Transport`] if `task` is missing or has an invalid task ID. +fn ready_task_to_inbound_entry( + task: storage::ReadyTask, +) -> Result { + let task_id = task + .task_id + .ok_or_else(|| StorageClientError::Transport("ready task missing task ID".to_owned())) + .and_then(|task_id| { + TaskId::try_from(task_id) + .map_err(|error| StorageClientError::Transport(error.to_string())) + })?; + Ok(InboundEntry { + resource_group_id: ResourceGroupId::from(task.resource_group_id), + job_id: JobId::from(task.job_id), + task_id, + }) +} + +/// # Returns +/// +/// [`storage::JobStateResponse`] converted into [`Result`]. +fn job_state_response_to_result( + response: storage::JobStateResponse, + job_id: JobId, +) -> Result { + match response.result { + Some(job_state_response::Result::State(state)) => storage::JobState::try_from(state) + .map_err(|error| StorageClientError::Transport(error.to_string())) + .and_then(|state| { + JobState::try_from(state) + .map_err(|error| StorageClientError::Transport(error.to_string())) + }), + Some(job_state_response::Result::Error(error)) => { + Err(job_orchestration_error_to_client_error(error, job_id)) + } + None => Err(StorageClientError::Transport( + "job state response missing `result` message".to_owned(), + )), + } +} + +/// # Returns +/// +/// [`storage::JobOrchestrationError`] converted into [`StorageClientError`]. +fn job_orchestration_error_to_client_error( + error: storage::JobOrchestrationError, + requested_job_id: JobId, +) -> StorageClientError { + match job_orchestration_error::ErrCode::try_from(error.err_code) { + Ok(job_orchestration_error::ErrCode::JobNotFound) => { + StorageClientError::JobNotFound(requested_job_id) + } + Ok(job_orchestration_error::ErrCode::StaleSession) => StorageClientError::StaleSession { + storage_session: error.storage_session, + }, + Ok(job_orchestration_error::ErrCode::InvalidInput) => { + StorageClientError::InvalidInput(error.message) + } + Ok( + job_orchestration_error::ErrCode::Server + | job_orchestration_error::ErrCode::Unspecified, + ) => StorageClientError::Server(error.message), + Err(error) => { + StorageClientError::Transport(format!("unknown job management error kind: {error}")) + } + } +} + +/// Converts a displayable transport-layer error into [`StorageClientError::Transport`]. +/// +/// # Returns +/// +/// A [`StorageClientError::Transport`] containing `error`'s display string. +fn to_transport_error(error: impl std::fmt::Display) -> StorageClientError { + StorageClientError::Transport(error.to_string()) +} + +/// Converts a displayable out-of-range error into [`StorageClientError::InvalidInput`]. +/// +/// # Returns +/// +/// A [`StorageClientError::InvalidInput`] containing `error`'s display string. +fn to_invalid_input_error(error: impl std::fmt::Display) -> StorageClientError { + StorageClientError::InvalidInput(error.to_string()) +} + +#[cfg(test)] +mod tests { + use spider_core::types::id::{JobId, ResourceGroupId, TaskId}; + use spider_proto_rust::storage::{ + self, + inbound_queue_response_error, + poll_ready_tasks_response, + }; + + use super::*; + + const SESSION_ID: SessionId = 11; + const RESOURCE_GROUP_ID: u64 = 3; + const JOB_ID: u64 = 5; + const TASK_INDEX: usize = 7; + + #[test] + fn poll_ready_tasks_response_converts_entries() { + let response = storage::PollReadyTasksResponse { + result: Some(poll_ready_tasks_response::Result::Tasks( + storage::ReadyTasks { + session_id: SESSION_ID, + tasks: vec![storage::ReadyTask { + resource_group_id: RESOURCE_GROUP_ID, + job_id: JOB_ID, + task_id: Some(storage::TaskId::from(TaskId::Index(TASK_INDEX))), + }], + }, + )), + }; + + let (session_id, entries) = poll_ready_tasks_response_to_result(response) + .expect("poll response conversion should succeed"); + + assert_eq!(session_id, SESSION_ID); + assert_eq!( + entries, + vec![InboundEntry { + resource_group_id: ResourceGroupId::from(RESOURCE_GROUP_ID), + job_id: JobId::from(JOB_ID), + task_id: TaskId::Index(TASK_INDEX), + }] + ); + } + + #[test] + fn poll_ready_tasks_response_rejects_missing_task_id() { + const MISSING_TASK_ID_MESSAGE: &str = "missing task ID"; + + let response = storage::PollReadyTasksResponse { + result: Some(poll_ready_tasks_response::Result::Tasks( + storage::ReadyTasks { + session_id: SESSION_ID, + tasks: vec![storage::ReadyTask { + resource_group_id: RESOURCE_GROUP_ID, + job_id: JOB_ID, + task_id: None, + }], + }, + )), + }; + + match poll_ready_tasks_response_to_result(response) { + Err(StorageClientError::Transport(message)) => { + assert!(message.contains(MISSING_TASK_ID_MESSAGE)); + } + result => panic!("unexpected poll response conversion result: {result:?}"), + } + } + + #[test] + fn inbound_queue_response_error_maps_inbound_closed() { + const ERROR_MESSAGE: &str = "closed"; + + let error = storage::InboundQueueResponseError { + err_code: inbound_queue_response_error::ErrCode::InboundClosed.into(), + message: ERROR_MESSAGE.to_owned(), + }; + + assert!(matches!( + StorageClientError::from(error), + StorageClientError::InboundClosed + )); + } +} diff --git a/components/spider-scheduler/src/storage_client.rs b/components/spider-scheduler/src/storage_client/mod.rs similarity index 98% rename from components/spider-scheduler/src/storage_client.rs rename to components/spider-scheduler/src/storage_client/mod.rs index 9f7adaf4..e8a607bd 100644 --- a/components/spider-scheduler/src/storage_client.rs +++ b/components/spider-scheduler/src/storage_client/mod.rs @@ -10,6 +10,10 @@ use spider_core::{ use crate::{error::StorageClientError, types::InboundEntry}; +pub mod grpc; + +pub use grpc::GrpcSchedulerStorageClient; + /// The scheduler's view of the storage layer. /// /// Abstracts the storage-owned inbound queue and the read-only queries a scheduling algorithm