Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/spider-execution-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod liveness;
pub mod scheduler;
pub mod storage;

pub use grpc::GrpcStorageClient;
pub use grpc::{GrpcLivenessClient, GrpcStorageClient};
pub use liveness::{LivenessClient, LivenessResponseError, RegistrationResponse};
pub use scheduler::{SchedulerClient, SchedulerError, SchedulerResponse};
pub use storage::{StorageClient, StorageResponseError};
213 changes: 213 additions & 0 deletions components/spider-execution-manager/src/client/grpc/liveness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
//! gRPC-backed [`LivenessClient`] implementation.

use std::net::IpAddr;

use async_trait::async_trait;
use spider_core::types::id::{ExecutionManagerId, SessionId};
use spider_proto_rust::storage::{
self,
execution_manager_liveness_error,
execution_manager_liveness_service_client::ExecutionManagerLivenessServiceClient,
register_execution_manager_response,
update_execution_manager_heartbeat_response,
};
use tonic::transport::{Channel, Endpoint};

use crate::client::liveness::{LivenessClient, LivenessResponseError, RegistrationResponse};

/// gRPC-backed [`LivenessClient`] implementation.
#[derive(Debug, Clone)]
pub struct GrpcLivenessClient {
client: ExecutionManagerLivenessServiceClient<Channel>,
}

impl GrpcLivenessClient {
/// Connects to the storage gRPC endpoint.
///
/// # Returns
///
/// A new [`GrpcLivenessClient`] connected to `endpoint` on success.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`LivenessResponseError::Transport`] if tonic cannot create or connect to the endpoint.
pub async fn connect(endpoint: Endpoint) -> Result<Self, LivenessResponseError> {
ExecutionManagerLivenessServiceClient::connect(endpoint)
.await
.map(|client| Self { client })
.map_err(to_transport_error)
}
}

#[async_trait]
impl LivenessClient for GrpcLivenessClient {
async fn register(&self, ip: IpAddr) -> Result<RegistrationResponse, LivenessResponseError> {
let request = storage::RegisterExecutionManagerRequest {
ip_address: ip.to_string(),
};
let response = self
.client
.clone()
.register_execution_manager(request)
.await
.map_err(to_transport_error)?
.into_inner();

register_response_to_result(response)
}

async fn heartbeat(
&self,
em_id: ExecutionManagerId,
) -> Result<SessionId, LivenessResponseError> {
let request = storage::ExecutionManagerIdRequest {
execution_manager_id: em_id.get(),
};
let response = self
.client
.clone()
.update_execution_manager_heartbeat(request)
.await
.map_err(to_transport_error)?
.into_inner();

heartbeat_response_to_result(response)
}
}

impl From<storage::ExecutionManagerLivenessError> for LivenessResponseError {
fn from(error: storage::ExecutionManagerLivenessError) -> Self {
match execution_manager_liveness_error::ErrCode::try_from(error.err_code) {
Ok(execution_manager_liveness_error::ErrCode::MarkedDead) => Self::MarkedDead,
Ok(execution_manager_liveness_error::ErrCode::InvalidInput) => {
Self::IllegalId(error.message)
}
Ok(
execution_manager_liveness_error::ErrCode::Server
| execution_manager_liveness_error::ErrCode::Unspecified,
) => Self::Transport(error.message),
Err(error) => Self::Transport(format!(
"unknown execution manager liveness error kind: {error}"
)),
}
}
}

/// # Returns
///
/// [`storage::RegisterExecutionManagerResponse`] converted into
/// [`Result<RegistrationResponse, LivenessResponseError>`].
fn register_response_to_result(
response: storage::RegisterExecutionManagerResponse,
) -> Result<RegistrationResponse, LivenessResponseError> {
match response.result {
Some(register_execution_manager_response::Result::Registration(registration)) => {
Ok(RegistrationResponse {
em_id: ExecutionManagerId::from(registration.execution_manager_id),
session_id: registration.session_id,
})
}
Some(register_execution_manager_response::Result::Error(error)) => Err(error.into()),
None => Err(LivenessResponseError::Transport(
"register execution manager response missing result".to_owned(),
)),
}
}

/// # Returns
///
/// [`storage::UpdateExecutionManagerHeartbeatResponse`] converted into
/// [`Result<SessionId, LivenessResponseError>`].
fn heartbeat_response_to_result(
response: storage::UpdateExecutionManagerHeartbeatResponse,
) -> Result<SessionId, LivenessResponseError> {
match response.result {
Some(update_execution_manager_heartbeat_response::Result::SessionId(session_id)) => {
Ok(session_id)
}
Some(update_execution_manager_heartbeat_response::Result::Error(error)) => {
Err(error.into())
}
None => Err(LivenessResponseError::Transport(
"update execution manager heartbeat response missing result".to_owned(),
)),
}
}

/// Converts a displayable transport-layer error into [`LivenessResponseError::Transport`].
///
/// # Returns
///
/// A [`LivenessResponseError::Transport`] containing `error`'s display string.
fn to_transport_error(error: impl std::fmt::Display) -> LivenessResponseError {
LivenessResponseError::Transport(error.to_string())
}

#[cfg(test)]
mod tests {
use spider_core::types::id::ExecutionManagerId;

use super::*;
use crate::client::{LivenessResponseError, RegistrationResponse};

#[test]
fn register_response_to_result_returns_registration() {
const SESSION_ID: SessionId = 7;
const EM_ID: ExecutionManagerId = ExecutionManagerId::from(5);

let response = storage::RegisterExecutionManagerResponse {
result: Some(register_execution_manager_response::Result::Registration(
storage::ExecutionManagerRegistration {
execution_manager_id: EM_ID.get(),
session_id: SESSION_ID,
},
)),
};

let registration = register_response_to_result(response)
.expect("registration response conversion should succeed");

assert_eq!(
registration,
RegistrationResponse {
em_id: EM_ID,
session_id: SESSION_ID,
}
);
}

#[test]
fn heartbeat_response_to_result_returns_session_id() {
const SESSION_ID: SessionId = 9;

let response = storage::UpdateExecutionManagerHeartbeatResponse {
result: Some(
update_execution_manager_heartbeat_response::Result::SessionId(SESSION_ID),
),
};

let session_id = heartbeat_response_to_result(response)
.expect("heartbeat response conversion should succeed");

assert_eq!(session_id, SESSION_ID);
}

#[test]
fn liveness_storage_error_maps_invalid_input_to_illegal_id() {
const ERROR_MSG: &str = "bad em id";

let error = storage::ExecutionManagerLivenessError {
err_code: execution_manager_liveness_error::ErrCode::InvalidInput.into(),
message: ERROR_MSG.to_owned(),
};

match LivenessResponseError::from(error) {
LivenessResponseError::IllegalId(message) => {
assert_eq!(message, ERROR_MSG);
}
error => panic!("unexpected liveness response error: {error:?}"),
}
}
}
2 changes: 2 additions & 0 deletions components/spider-execution-manager/src/client/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! gRPC-backed implementations of the execution manager's client traits.

pub mod liveness;
pub mod storage;

pub use liveness::GrpcLivenessClient;
pub use storage::GrpcStorageClient;
49 changes: 27 additions & 22 deletions components/spider-execution-manager/src/client/grpc/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use spider_core::types::{
use spider_proto_rust::storage::{
self,
register_task_instance_response,
storage_error,
storage_operation_response,
task_instance_management_error,
task_instance_management_service_client::TaskInstanceManagementServiceClient,
task_instance_operation_response,
};
use tonic::transport::{Channel, Endpoint};

Expand All @@ -40,7 +40,7 @@ impl GrpcStorageClient {
pub async fn connect(endpoint: Endpoint) -> Result<Self, StorageResponseError> {
TaskInstanceManagementServiceClient::connect(endpoint)
.await
.map(|inner| Self { client: inner })
.map(|client| Self { client })
.map_err(to_transport_error)
}
}
Expand Down Expand Up @@ -136,32 +136,37 @@ impl StorageClient for GrpcStorageClient {
}
}

impl From<storage::StorageError> for StorageResponseError {
fn from(error: storage::StorageError) -> Self {
match storage_error::ErrCode::try_from(error.err_code) {
Ok(storage_error::ErrCode::StaleSession) => Self::StaleSession {
impl From<storage::TaskInstanceManagementError> for StorageResponseError {
fn from(error: storage::TaskInstanceManagementError) -> Self {
match task_instance_management_error::ErrCode::try_from(error.err_code) {
Ok(task_instance_management_error::ErrCode::StaleSession) => Self::StaleSession {
storage_session: error.storage_session,
},
Ok(storage_error::ErrCode::CacheStale) => Self::CacheStale(error.message),
Ok(storage_error::ErrCode::Transport) => Self::Transport(error.message),
Ok(storage_error::ErrCode::Server | storage_error::ErrCode::Unspecified) => {
Self::Server(error.message)
Ok(task_instance_management_error::ErrCode::CacheStale) => {
Self::CacheStale(error.message)
}
Ok(storage_error::ErrCode::InvalidInput) => Self::InvalidInput(error.message),
Err(error) => Self::Transport(format!("unknown storage error kind: {error}")),
Ok(
task_instance_management_error::ErrCode::Server
| task_instance_management_error::ErrCode::Unspecified,
) => Self::Server(error.message),
Ok(task_instance_management_error::ErrCode::InvalidInput) => {
Self::InvalidInput(error.message)
}
Err(error) => Self::Transport(format!("unknown task instance error kind: {error}")),
}
}
}

/// # Returns
///
/// [`storage::StorageOperationResponse`] converted into [`Result<(), StorageResponseError>`].
/// [`storage::TaskInstanceOperationResponse`] converted into
/// [`Result<(), StorageResponseError>`].
fn storage_operation_response_to_result(
response: storage::StorageOperationResponse,
response: storage::TaskInstanceOperationResponse,
) -> Result<(), StorageResponseError> {
match response.result {
Some(storage_operation_response::Result::Ok(_)) => Ok(()),
Some(storage_operation_response::Result::Error(error)) => Err(error.into()),
Some(task_instance_operation_response::Result::Ok(_)) => Ok(()),
Some(task_instance_operation_response::Result::Error(error)) => Err(error.into()),
None => Err(StorageResponseError::Transport(
"storage operation response missing `result` message".to_owned(),
)),
Expand All @@ -183,8 +188,8 @@ mod tests {

#[test]
fn storage_error_maps_stale_session() {
let error = storage::StorageError {
err_code: storage_error::ErrCode::StaleSession.into(),
let error = storage::TaskInstanceManagementError {
err_code: task_instance_management_error::ErrCode::StaleSession.into(),
message: "stale".to_owned(),
storage_session: 7,
};
Expand All @@ -199,23 +204,23 @@ mod tests {

#[test]
fn storage_error_maps_unknown_kind_to_transport_error() {
let error = storage::StorageError {
let error = storage::TaskInstanceManagementError {
err_code: 99,
message: "unknown".to_owned(),
storage_session: 0,
};

match StorageResponseError::from(error) {
StorageResponseError::Transport(message) => {
assert!(message.contains("unknown storage error kind"));
assert!(message.contains("unknown task instance error kind"));
}
error => panic!("unexpected storage response error: {error:?}"),
}
}

#[test]
fn missing_storage_operation_result_is_transport_error() {
match storage_operation_response_to_result(storage::StorageOperationResponse {
match storage_operation_response_to_result(storage::TaskInstanceOperationResponse {
result: None,
}) {
Err(StorageResponseError::Transport(_)) => {}
Expand Down
1 change: 1 addition & 0 deletions components/spider-proto-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ path = "src/lib.rs"
[dependencies]
prost = "0.13.5"
spider-core = { path = "../spider-core" }
thiserror = "2.0.18"
tonic = "0.12.3"

[build-dependencies]
Expand Down
17 changes: 17 additions & 0 deletions components/spider-proto-rust/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! Error types for converting protobuf wire values into Spider core types.

/// Errors produced when converting a protobuf message into its Spider core representation.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// A protobuf task index could not be represented as a [`usize`].
#[error("task index does not fit in `usize`: {0}")]
TaskIndexOutOfRange(u64),

/// A protobuf [`crate::storage::TaskId`] carried no `kind`.
#[error("task id missing kind")]
TaskIdKindMissing,

/// A protobuf [`crate::storage::JobState`] was left unspecified.
#[error("job state is unspecified")]
JobStateUnspecified,
}
Loading
Loading