Skip to content

Commit 0861724

Browse files
committed
Implement PR feedback
1 parent dd575b6 commit 0861724

4 files changed

Lines changed: 36 additions & 32 deletions

File tree

rust/crates/sift_cli/src/cli/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,15 @@ pub struct TestServerArgs {
182182

183183
/// Whether to stream metrics to Sift.
184184
#[arg(short, long)]
185-
pub stream_metrics: Option<bool>,
185+
pub stream_metrics: bool,
186186

187187
/// The asset name to use when streaming server ingestion metrics.
188188
#[arg(short, long)]
189189
pub metrics_asset_name: Option<String>,
190+
191+
/// Include to use plain output.
192+
#[arg(short, long)]
193+
pub plain_output: bool,
190194
}
191195

192196
#[derive(clap::Args)]

rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ pub struct MetricsStreamingClient {
1717
impl MetricsStreamingClient {
1818
pub fn build(
1919
ctx: Context,
20-
stream_metrics: &Option<bool>,
20+
stream_metrics: &bool,
2121
asset_name: &Option<String>,
2222
) -> Result<Option<MetricsStreamingClient>, anyhow::Error> {
23-
if !stream_metrics.unwrap_or(false) {
23+
if !stream_metrics {
2424
return Ok(None);
2525
}
2626

rust/crates/sift_cli/src/cmd/test_server/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use tokio::sync::watch;
1414
use tonic::transport::Server;
1515
use tonic_reflection::server::Builder;
1616

17-
pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../../../sift_rs/descriptor_set.bin");
1817
pub mod metrics_streaming_client;
1918
pub mod server;
2019
use crate::cmd::test_server::metrics_streaming_client::Metrics;
@@ -56,7 +55,8 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
5655
.calculate_metrics(
5756
&mut shutdown_rx,
5857
metrics_tx,
59-
args.stream_metrics.unwrap_or(false),
58+
args.stream_metrics,
59+
args.plain_output,
6060
)
6161
.await
6262
.context("calculate metrics task failed")
@@ -81,7 +81,10 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
8181
});
8282

8383
let reflection_service = Builder::configure()
84-
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
84+
.register_encoded_file_descriptor_set(sift_rs::assets::v1::FILE_DESCRIPTOR_SET)
85+
.register_encoded_file_descriptor_set(sift_rs::ingest::v1::FILE_DESCRIPTOR_SET)
86+
.register_encoded_file_descriptor_set(sift_rs::ingestion_configs::v2::FILE_DESCRIPTOR_SET)
87+
.register_encoded_file_descriptor_set(sift_rs::ping::v1::FILE_DESCRIPTOR_SET)
8588
.build_v1()
8689
.context("failed to create gRPC reflection service")?;
8790

rust/crates/sift_cli/src/cmd/test_server/server.rs

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use sift_rs::ingestion_configs::v2::{ingestion_config_service_server::IngestionC
1515
use sift_rs::ping::v1::{PingRequest, PingResponse, ping_service_server::PingService};
1616
use std::io::stdout;
1717
use std::sync::atomic::AtomicBool;
18-
use std::time::Duration;
1918
use std::{
2019
collections::HashMap,
2120
sync::{
@@ -25,6 +24,7 @@ use std::{
2524
};
2625
use tokio::sync::mpsc::Sender;
2726
use tokio::sync::watch;
27+
use tokio::time::Duration;
2828
use tokio_stream::StreamExt;
2929
use tonic::{Request, Response, Status, Streaming};
3030
use uuid::Uuid;
@@ -61,7 +61,9 @@ impl TestServer {
6161
shutdown: &mut watch::Receiver<bool>,
6262
metrics_tx: Sender<Metrics>,
6363
streaming_enabled: bool,
64+
plain_output: bool,
6465
) -> AnyhowResult<()> {
66+
let mut interval = tokio::time::interval(Duration::from_secs(1));
6567
let mut stdout = stdout();
6668

6769
let mut last_total_num_bytes_read: u64 = 0;
@@ -75,7 +77,7 @@ impl TestServer {
7577
return AnyhowOk(());
7678
}
7779

78-
_ = tokio::time::sleep(Duration::from_secs(1)) => {
80+
_ = interval.tick() => {
7981
let current_total_num_bytes_read = self.total_num_bytes_read.load(Relaxed);
8082
let current_total_num_messages = self.total_num_messages.load(Relaxed);
8183
let current_total_num_streams = self.total_num_streams.load(Relaxed);
@@ -85,16 +87,21 @@ impl TestServer {
8587
last_total_num_bytes_read = current_total_num_bytes_read;
8688
last_total_num_messages = current_total_num_messages;
8789

88-
// Clear terminal and print metrics.
89-
stdout
90-
.execute(terminal::Clear(terminal::ClearType::All))
91-
.context("failed to clear terminal")?;
92-
stdout.execute(cursor::MoveTo(0, 0))
93-
.context("failed to move terminal cursor")?;
94-
stdout.execute(cursor::MoveUp(5))
95-
.context("failed to move terminal cursor")?;
96-
stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown))
97-
.context("failed to move terminal cursor")?;
90+
if !plain_output {
91+
// Clear terminal and print metrics.
92+
stdout
93+
.execute(terminal::Clear(terminal::ClearType::All))
94+
.context("failed to clear terminal")?;
95+
stdout.execute(cursor::MoveTo(0, 0))
96+
.context("failed to move terminal cursor")?;
97+
stdout.execute(cursor::MoveUp(5))
98+
.context("failed to move terminal cursor")?;
99+
stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown))
100+
.context("failed to move terminal cursor")?;
101+
} else {
102+
Output::new().line(format!("-----")).print();
103+
Output::new().line(format!("{}", chrono::Local::now().to_rfc3339())).print();
104+
}
98105

99106
Output::new().line(format!("Total num streams: {current_total_num_streams}")).print();
100107
Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print();
@@ -159,9 +166,7 @@ impl AssetService for TestServer {
159166
&self,
160167
_request: Request<sift_rs::assets::v1::DeleteAssetRequest>,
161168
) -> Result<Response<sift_rs::assets::v1::DeleteAssetResponse>, Status> {
162-
Ok(Response::new(
163-
sift_rs::assets::v1::DeleteAssetResponse::default(),
164-
))
169+
unimplemented!()
165170
}
166171

167172
/// No-op.
@@ -179,19 +184,15 @@ impl AssetService for TestServer {
179184
&self,
180185
_request: Request<sift_rs::assets::v1::UpdateAssetRequest>,
181186
) -> Result<Response<sift_rs::assets::v1::UpdateAssetResponse>, Status> {
182-
Ok(Response::new(
183-
sift_rs::assets::v1::UpdateAssetResponse::default(),
184-
))
187+
unimplemented!()
185188
}
186189

187190
/// No-op.
188191
async fn archive_asset(
189192
&self,
190193
_request: Request<sift_rs::assets::v1::ArchiveAssetRequest>,
191194
) -> Result<Response<sift_rs::assets::v1::ArchiveAssetResponse>, Status> {
192-
Ok(Response::new(
193-
sift_rs::assets::v1::ArchiveAssetResponse::default(),
194-
))
195+
unimplemented!()
195196
}
196197
}
197198

@@ -209,11 +210,7 @@ impl IngestionConfigService for TestServer {
209210
.ok_or(Status::not_found("ingestion config not found"))?;
210211

211212
Ok(Response::new(GetIngestionConfigResponse {
212-
ingestion_config: Some(IngestionConfig {
213-
ingestion_config_id: ingestion_config.ingestion_config_id.clone(),
214-
asset_id: ingestion_config.asset_id.clone(),
215-
client_key: ingestion_config.client_key.clone(),
216-
}),
213+
ingestion_config: Some(ingestion_config.clone()),
217214
}))
218215
}
219216

0 commit comments

Comments
 (0)