Skip to content

Commit 60acbc8

Browse files
committed
feat(relay): activate DMQ
1 parent adfed6f commit 60acbc8

File tree

8 files changed

+3
-83
lines changed

8 files changed

+3
-83
lines changed

mithril-relay/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ license = { workspace = true }
1010
repository = { workspace = true }
1111

1212
[features]
13-
default = ["future_dmq"]
1413
bundle_tls = ["reqwest/native-tls-vendored"]
15-
future_dmq = ["dep:mithril-dmq"]
1614

1715
[dependencies]
1816
anyhow = { workspace = true }
@@ -37,7 +35,7 @@ libp2p = { version = "0.56.0", features = [
3735
"yamux",
3836
] }
3937
mithril-common = { path = "../mithril-common" }
40-
mithril-dmq = { path = "../internal/mithril-dmq", optional = true }
38+
mithril-dmq = { path = "../internal/mithril-dmq" }
4139
mithril-doc = { path = "../internal/mithril-doc" }
4240
mithril-test-http-server = { path = "../internal/tests/mithril-test-http-server" }
4341
reqwest = { workspace = true, features = [

mithril-relay/src/commands/aggregator.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
#[cfg(feature = "future_dmq")]
21
use std::path::PathBuf;
32

43
use clap::Parser;
54
use libp2p::Multiaddr;
65
use slog::error;
76

87
use mithril_common::StdResult;
9-
#[cfg(feature = "future_dmq")]
108
use mithril_dmq::DmqNetwork;
119

1210
use crate::AggregatorRelay;
@@ -24,7 +22,6 @@ pub struct AggregatorCommand {
2422
dial_to: Option<Multiaddr>,
2523

2624
/// Path to the DMQ socket file
27-
#[cfg(feature = "future_dmq")]
2825
#[clap(
2926
long,
3027
env = "DMQ_NODE_SOCKET_PATH",
@@ -34,13 +31,11 @@ pub struct AggregatorCommand {
3431
dmq_node_socket_path: PathBuf,
3532

3633
/// DMQ network
37-
#[cfg(feature = "future_dmq")]
3834
#[clap(long, env = "NETWORK")]
3935
pub network: String,
4036

4137
/// DMQ Network Magic number
4238
/// useful for TestNet & DevNet
43-
#[cfg(feature = "future_dmq")]
4439
#[clap(long, env = "DMQ_NETWORK_MAGIC")]
4540
pub dmq_network_magic: Option<u64>,
4641

@@ -56,14 +51,11 @@ impl AggregatorCommand {
5651
let dial_to = self.dial_to.to_owned();
5752
let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?;
5853
let aggregator_endpoint = self.aggregator_endpoint.to_owned();
59-
#[cfg(feature = "future_dmq")]
6054
let dmq_network = DmqNetwork::from_code(self.network.to_owned(), self.dmq_network_magic)?;
6155

6256
let mut relay = AggregatorRelay::start(
6357
&addr,
64-
#[cfg(feature = "future_dmq")]
6558
&self.dmq_node_socket_path,
66-
#[cfg(feature = "future_dmq")]
6759
&dmq_network,
6860
&aggregator_endpoint,
6961
logger,

mithril-relay/src/commands/signer.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#[cfg(feature = "future_dmq")]
21
use std::path::PathBuf;
32
use std::time::Duration;
43

@@ -8,7 +7,6 @@ use libp2p::Multiaddr;
87
use slog::error;
98

109
use mithril_common::StdResult;
11-
#[cfg(feature = "future_dmq")]
1210
use mithril_dmq::DmqNetwork;
1311

1412
use crate::{SignerRelay, SignerRelayConfiguration, SignerRelayMode};
@@ -30,7 +28,6 @@ pub struct SignerCommand {
3028
dial_to: Option<Multiaddr>,
3129

3230
/// Path to the DMQ socket file
33-
#[cfg(feature = "future_dmq")]
3431
#[clap(
3532
long,
3633
env = "DMQ_NODE_SOCKET_PATH",
@@ -40,13 +37,11 @@ pub struct SignerCommand {
4037
dmq_node_socket_path: PathBuf,
4138

4239
/// Cardano network
43-
#[cfg(feature = "future_dmq")]
4440
#[clap(long, env = "NETWORK")]
4541
pub network: String,
4642

4743
/// Cardano Network Magic number
4844
/// useful for TestNet & DevNet
49-
#[cfg(feature = "future_dmq")]
5045
#[clap(long, env = "DMQ_NETWORK_MAGIC")]
5146
pub dmq_network_magic: Option<u64>,
5247

@@ -78,15 +73,12 @@ impl SignerCommand {
7873
let signature_registration_mode = &self.signature_registration_mode;
7974
let aggregator_endpoint = self.aggregator_endpoint.to_owned();
8075
let signer_repeater_delay = Duration::from_millis(self.signer_repeater_delay);
81-
#[cfg(feature = "future_dmq")]
8276
let dmq_network = DmqNetwork::from_code(self.network.to_owned(), self.dmq_network_magic)?;
8377

8478
let mut relay = SignerRelay::start(SignerRelayConfiguration {
8579
address: &addr,
8680
server_port: &server_port,
87-
#[cfg(feature = "future_dmq")]
8881
dmq_node_socket_path: &self.dmq_node_socket_path,
89-
#[cfg(feature = "future_dmq")]
9082
dmq_network: &dmq_network,
9183
signer_registration_mode,
9284
signature_registration_mode,

mithril-relay/src/p2p/peer.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use mithril_common::{
1515
logging::LoggerExtensions,
1616
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
1717
};
18-
#[cfg(feature = "future_dmq")]
1918
use mithril_dmq::DmqMessage;
2019
use serde::{Deserialize, Serialize};
2120
use slog::{Logger, debug, info};
@@ -73,7 +72,6 @@ pub enum BroadcastMessage {
7372
RegisterSignatureHttp(RegisterSignatureMessageHttp),
7473

7574
/// A DMQ signature registration message received from the Gossip sub
76-
#[cfg(feature = "future_dmq")]
7775
RegisterSignatureDmq(DmqMessage),
7876
}
7977

@@ -260,7 +258,6 @@ impl Peer {
260258
}
261259

262260
/// Publish a DMQ signature on the P2P pubsub
263-
#[cfg(feature = "future_dmq")]
264261
pub fn publish_signature_dmq(
265262
&mut self,
266263
message: &DmqMessage,

mithril-relay/src/relay/aggregator.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
#[cfg(feature = "future_dmq")]
21
use std::{path::Path, sync::Arc};
32

4-
#[cfg(feature = "future_dmq")]
53
use anyhow::Context;
64
use anyhow::anyhow;
75
use libp2p::Multiaddr;
86
use reqwest::StatusCode;
97
use slog::{Logger, error, info};
10-
#[cfg(feature = "future_dmq")]
118
use tokio::sync::{
129
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
1310
watch::{self, Receiver, Sender},
@@ -18,7 +15,6 @@ use mithril_common::{
1815
logging::LoggerExtensions,
1916
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
2017
};
21-
#[cfg(feature = "future_dmq")]
2218
use mithril_dmq::{DmqConsumerServer, DmqConsumerServerPallas, DmqMessage, DmqNetwork};
2319

2420
use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
@@ -27,9 +23,7 @@ use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
2723
pub struct AggregatorRelay {
2824
aggregator_endpoint: String,
2925
peer: Peer,
30-
#[cfg(feature = "future_dmq")]
3126
signature_dmq_tx: UnboundedSender<DmqMessage>,
32-
#[cfg(feature = "future_dmq")]
3327
stop_tx: Sender<()>,
3428
logger: Logger,
3529
}
@@ -38,14 +32,13 @@ impl AggregatorRelay {
3832
/// Start a relay for a Mithril aggregator
3933
pub async fn start(
4034
addr: &Multiaddr,
41-
#[cfg(feature = "future_dmq")] dmq_node_socket_path: &Path,
42-
#[cfg(feature = "future_dmq")] dmq_network: &DmqNetwork,
35+
dmq_node_socket_path: &Path,
36+
dmq_network: &DmqNetwork,
4337
aggregator_endpoint: &str,
4438
logger: &Logger,
4539
) -> StdResult<Self> {
4640
let peer = Peer::new(addr).with_logger(logger).start().await?;
4741
let logger = logger.new_with_component_name::<Self>();
48-
#[cfg(feature = "future_dmq")]
4942
{
5043
let (stop_tx, stop_rx) = watch::channel(());
5144
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
@@ -67,25 +60,17 @@ impl AggregatorRelay {
6760
logger,
6861
})
6962
}
70-
#[cfg(not(feature = "future_dmq"))]
71-
Ok(Self {
72-
aggregator_endpoint: aggregator_endpoint.to_owned(),
73-
peer,
74-
logger,
75-
})
7663
}
7764

7865
/// Stop the aggregator relay
7966
pub async fn stop(&self) -> StdResult<()> {
80-
#[cfg(feature = "future_dmq")]
8167
self.stop_tx
8268
.send(())
8369
.with_context(|| "Failed to send stop signal to DMQ consumer server")?;
8470

8571
Ok(())
8672
}
8773

88-
#[cfg(feature = "future_dmq")]
8974
async fn start_dmq_consumer_server(
9075
socket: &Path,
9176
dmq_network: &DmqNetwork,
@@ -206,7 +191,6 @@ impl AggregatorRelay {
206191
}
207192
}
208193
}
209-
#[cfg(feature = "future_dmq")]
210194
Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => {
211195
self.signature_dmq_tx.send(signature_message_received).with_context(
212196
|| "Failed to send signature message to DMQ consumer server",
@@ -266,9 +250,7 @@ mod tests {
266250
let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
267251
let relay = AggregatorRelay::start(
268252
&addr,
269-
#[cfg(feature = "future_dmq")]
270253
Path::new("test"),
271-
#[cfg(feature = "future_dmq")]
272254
&DmqNetwork::TestNet(123),
273255
&server.url(""),
274256
&TestLogger::stdout(),

mithril-relay/src/relay/passive.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ impl PassiveRelay {
3838
Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) => {
3939
info!(self.logger, "Received HTTP signature message from P2P network"; "signature_message" => #?signature_message_received);
4040
}
41-
#[cfg(feature = "future_dmq")]
4241
Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => {
4342
info!(self.logger, "Received DMQ signature message from P2P network"; "signature_message" => #?signature_message_received);
4443
}

mithril-relay/src/relay/signer.rs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
1-
#[cfg(feature = "future_dmq")]
21
use std::path::Path;
32
use std::{net::SocketAddr, sync::Arc, time::Duration};
43

5-
#[cfg(feature = "future_dmq")]
64
use anyhow::Context;
75
use clap::ValueEnum;
86
use libp2p::Multiaddr;
9-
#[cfg(feature = "future_dmq")]
107
use slog::error;
118
use slog::{Logger, debug, info};
129
use strum::Display;
1310
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
14-
#[cfg(feature = "future_dmq")]
1511
use tokio::sync::watch::{self, Receiver, Sender};
1612
use warp::Filter;
1713

@@ -20,7 +16,6 @@ use mithril_common::{
2016
logging::LoggerExtensions,
2117
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
2218
};
23-
#[cfg(feature = "future_dmq")]
2419
use mithril_dmq::{DmqMessage, DmqNetwork, DmqPublisherServer, DmqPublisherServerPallas};
2520
use mithril_test_http_server::{TestHttpServer, test_http_server_with_socket_address};
2621

@@ -63,10 +58,8 @@ pub struct SignerRelayConfiguration<'a> {
6358
/// Port on which the HTTP server will listen
6459
pub server_port: &'a u16,
6560
/// Path to the DMQ node socket file
66-
#[cfg(feature = "future_dmq")]
6761
pub dmq_node_socket_path: &'a Path,
6862
/// DMQ network
69-
#[cfg(feature = "future_dmq")]
7063
pub dmq_network: &'a DmqNetwork,
7164
/// Signer registration mode
7265
pub signer_registration_mode: &'a SignerRelayMode,
@@ -85,11 +78,9 @@ pub struct SignerRelay {
8578
http_server: TestHttpServer,
8679
peer: Peer,
8780
signature_http_rx: UnboundedReceiver<RegisterSignatureMessageHttp>,
88-
#[cfg(feature = "future_dmq")]
8981
signature_dmq_rx: UnboundedReceiver<DmqMessage>,
9082
signer_http_rx: UnboundedReceiver<RegisterSignerMessage>,
9183
signer_repeater: Arc<MessageRepeater<RegisterSignerMessage>>,
92-
#[cfg(feature = "future_dmq")]
9384
stop_tx: Sender<()>,
9485
logger: Logger,
9586
}
@@ -120,7 +111,6 @@ impl SignerRelay {
120111
.await;
121112
info!(relay_logger, "Listening on"; "address" => ?http_server.address());
122113

123-
#[cfg(feature = "future_dmq")]
124114
{
125115
let (stop_tx, stop_rx) = watch::channel(());
126116
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
@@ -145,28 +135,17 @@ impl SignerRelay {
145135
logger: relay_logger,
146136
})
147137
}
148-
#[cfg(not(feature = "future_dmq"))]
149-
Ok(Self {
150-
http_server,
151-
peer,
152-
signature_http_rx: signature_rx,
153-
signer_http_rx: signer_rx,
154-
signer_repeater,
155-
logger: relay_logger,
156-
})
157138
}
158139

159140
/// Stop the signer relay
160141
pub async fn stop(&self) -> StdResult<()> {
161-
#[cfg(feature = "future_dmq")]
162142
self.stop_tx
163143
.send(())
164144
.with_context(|| "Failed to send stop signal to DMQ publisher server")?;
165145

166146
Ok(())
167147
}
168148

169-
#[cfg(feature = "future_dmq")]
170149
async fn start_dmq_publisher_server(
171150
socket: &Path,
172151
dmq_network: &DmqNetwork,
@@ -288,7 +267,6 @@ impl SignerRelay {
288267
}
289268
}
290269

291-
#[cfg(feature = "future_dmq")]
292270
fn process_register_signature_dmq_message(
293271
&mut self,
294272
message: Option<DmqMessage>,
@@ -306,7 +284,6 @@ impl SignerRelay {
306284

307285
/// Tick the signer relay
308286
pub async fn tick(&mut self) -> StdResult<()> {
309-
#[cfg(feature = "future_dmq")]
310287
tokio::select! {
311288
message = self.signature_http_rx.recv() => {
312289
self.process_register_signature_http_message(message)
@@ -320,17 +297,6 @@ impl SignerRelay {
320297
_ = self.signer_repeater.repeat_message() => {Ok(())},
321298
_event = self.peer.tick_swarm() => {Ok(())}
322299
}
323-
#[cfg(not(feature = "future_dmq"))]
324-
tokio::select! {
325-
message = self.signature_http_rx.recv() => {
326-
self.process_register_signature_http_message(message)
327-
},
328-
message = self.signer_http_rx.recv() => {
329-
self.process_register_signer_http_message(message)
330-
},
331-
_ = self.signer_repeater.repeat_message() => {Ok(())},
332-
_event = self.peer.tick_swarm() => {Ok(())}
333-
}
334300
}
335301

336302
/// Receive signature from the underlying channel

mithril-relay/tests/register_signer_signature.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#[cfg(feature = "future_dmq")]
21
use std::path::PathBuf;
32
use std::{sync::Arc, time::Duration};
43

@@ -9,7 +8,6 @@ use slog_scope::{error, info};
98

109
use mithril_common::messages::{RegisterSignatureMessageHttp, RegisterSignerMessage};
1110
use mithril_common::test::double::Dummy;
12-
#[cfg(feature = "future_dmq")]
1311
use mithril_dmq::DmqNetwork;
1412
use mithril_relay::{
1513
PassiveRelay, SignerRelay, SignerRelayConfiguration, SignerRelayMode,
@@ -41,9 +39,7 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() {
4139
let total_peers = 1 + total_p2p_client;
4240
let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
4341
let server_port = 0;
44-
#[cfg(feature = "future_dmq")]
4542
let dmq_node_socket_path = PathBuf::new();
46-
#[cfg(feature = "future_dmq")]
4743
let dmq_network = DmqNetwork::TestNet(123);
4844
let signer_registration_mode = SignerRelayMode::P2P;
4945
let signature_registration_mode = SignerRelayMode::P2P;
@@ -52,9 +48,7 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() {
5248
let mut signer_relay = SignerRelay::start(SignerRelayConfiguration {
5349
address: &addr,
5450
server_port: &server_port,
55-
#[cfg(feature = "future_dmq")]
5651
dmq_node_socket_path: &dmq_node_socket_path,
57-
#[cfg(feature = "future_dmq")]
5852
dmq_network: &dmq_network,
5953
signer_registration_mode: &signer_registration_mode,
6054
signature_registration_mode: &signature_registration_mode,

0 commit comments

Comments
 (0)