From 56152aaed395d65dade3e22aa5235eb16bfc8e48 Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 14 Aug 2025 08:56:48 +0530 Subject: [PATCH] make tracker a library --- Cargo.toml | 8 ++ src/lib.rs | 176 +++++++++++++++++++++++++++++++++ src/main.rs | 277 ++++++---------------------------------------------- 3 files changed, 212 insertions(+), 249 deletions(-) create mode 100644 src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index bebbbcc..11fd552 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,14 @@ name = "tracker" version = "0.1.0" edition = "2024" +[lib] +name = "tracker" +path = "src/lib.rs" + +[[bin]] +name = "tracker" +path = "src/main.rs" + [dependencies] bitcoincore-rpc = "0.19.0" chrono = { version = "0.4", features = ["serde"] } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..298df41 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,176 @@ +#![allow(warnings)] +use bitcoincore_rpc::{Auth, Client}; +use diesel::SqliteConnection; +use diesel::r2d2::ConnectionManager; +use r2d2::Pool; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{error, info, warn}; + +use crate::status::{State, Status}; +use crate::types::DbRequest; + +mod db; +mod error; +mod handle_error; +mod indexer; +mod server; +mod status; +mod tor; +mod types; +mod utils; + +use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + +#[derive(Debug, Clone)] +pub struct Config { + pub rpc_url: String, + pub rpc_auth: Auth, + pub address: String, + pub control_port: u16, + pub tor_auth_password: String, + pub socks_port: u16, + pub datadir: String, +} + +fn run_migrations(pool: Arc>>) { + let mut conn = pool + .get() + .expect("Failed to get DB connection for migrations"); + conn.run_pending_migrations(MIGRATIONS) + .expect("Migration failed"); +} + +pub async fn start(cfg: Config) { + info!("Connecting to indexer db"); + let database_url = format!("{}/tracker.db", cfg.datadir); + if let Some(parent) = Path::new(&database_url).parent() { + std::fs::create_dir_all(parent).expect("Failed to create database directory"); + } + let manager = ConnectionManager::::new(database_url); + let pool = Arc::new( + Pool::builder() + .build(manager) + .expect("Failed to create DB pool"), + ); + run_migrations(pool.clone()); + info!("Connected to indexer db"); + + tor::check_tor_status(cfg.control_port, &cfg.tor_auth_password) + .await + .expect("Failed to check Tor status"); + + let hostname = match cfg.address.split_once(':') { + Some((_, port)) => { + let port = port.parse::().expect("Invalid port in address"); + tor::get_tor_hostname( + Path::new(&cfg.datadir), + cfg.control_port, + port, + &cfg.tor_auth_password, + ) + .await + .expect("Failed to retrieve Tor hostname") + } + None => { + error!("Invalid address format. Expected format: :"); + return; + } + }; + + info!("Tracker is listening at {}", hostname); + + let (mut db_tx, db_rx) = mpsc::channel::(10); + let (status_tx, mut status_rx) = mpsc::channel::(10); + + let rpc_client = Client::new(&cfg.rpc_url, cfg.rpc_auth.clone()).unwrap(); + + spawn_db_manager(pool.clone(), db_rx, status_tx.clone()).await; + spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), rpc_client).await; + spawn_server( + db_tx.clone(), + status_tx.clone(), + cfg.address.clone(), + cfg.socks_port, + hostname.clone(), + ) + .await; + + info!("Tracker started"); + + while let Some(status) = status_rx.recv().await { + match status.state { + State::DBShutdown(err) => { + warn!( + "DB Manager exited unexpectedly. Restarting... Error: {:?}", + err + ); + let (new_db_tx, new_db_rx) = mpsc::channel::(10); + db_tx = new_db_tx; + spawn_db_manager(pool.clone(), new_db_rx, status_tx.clone()).await; + } + State::Healthy(info) => { + info!("System healthy: {:?}", info); + } + State::MempoolShutdown(err) => { + warn!("Mempool Indexer crashed. Restarting... Error: {:?}", err); + let client = Client::new(&cfg.rpc_url, cfg.rpc_auth.clone()).unwrap(); + spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), client).await; + } + State::ServerShutdown(err) => { + warn!("Server crashed. Restarting... Error: {:?}", err); + spawn_server( + db_tx.clone(), + status_tx.clone(), + cfg.address.clone(), + cfg.socks_port, + hostname.clone(), + ) + .await; + } + } + } +} + +async fn spawn_db_manager( + pool: Arc>>, + db_rx: tokio::sync::mpsc::Receiver, + status_tx: tokio::sync::mpsc::Sender, +) { + info!("Spawning db manager"); + tokio::spawn(db::run(pool, db_rx, status::Sender::DBManager(status_tx))); +} + +async fn spawn_mempool_indexer( + pool: Arc>>, + db_tx: tokio::sync::mpsc::Sender, + status_tx: tokio::sync::mpsc::Sender, + client: Client, +) { + info!("Spawning indexer"); + tokio::spawn(indexer::run( + pool, + db_tx, + status::Sender::Mempool(status_tx), + client.into(), + )); +} + +async fn spawn_server( + db_tx: tokio::sync::mpsc::Sender, + status_tx: tokio::sync::mpsc::Sender, + address: String, + socks_port: u16, + hostname: String, +) { + info!("Spawning server instance"); + tokio::spawn(server::run( + db_tx, + status::Sender::Server(status_tx), + address, + socks_port, + hostname, + )); +} diff --git a/src/main.rs b/src/main.rs index c0c6ebc..4416e06 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,265 +1,44 @@ -#![allow(dead_code)] -use std::path::Path; -use std::sync::Arc; - use bitcoincore_rpc::Auth; -use bitcoincore_rpc::Client; use clap::Parser; -use diesel::SqliteConnection; -use diesel::r2d2::ConnectionManager; -use error::TrackerError; -use r2d2::Pool; -use status::{State, Status}; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tor::check_tor_status; -use tor::get_tor_hostname; -use tracing::error; -use tracing::{info, warn}; -use types::DbRequest; -mod db; -mod error; -mod handle_error; -mod indexer; -mod server; -mod status; -mod tor; -mod types; -mod utils; +use tracker::{Config, start}; #[derive(Parser)] -#[clap(version = option_env ! ("CARGO_PKG_VERSION").unwrap_or("unknown"), -author = option_env ! ("CARGO_PKG_AUTHORS").unwrap_or(""))] struct App { - #[clap( - name = "ADDRESS:PORT", - long, - short = 'r', - default_value = "127.0.0.1:18443" - )] - pub(crate) rpc: String, - #[clap( - name = "USER:PASSWORD", - short = 'a', - long, - value_parser = parse_proxy_auth, - default_value = "username:password", - )] - pub auth: (String, String), - #[clap( - name = "Server ADDRESS:PORT", - short = 's', - long, - default_value = "127.0.0.1:8080" - )] - pub address: String, - - #[clap(name = "control port PORT", short = 'c', long, default_value = "9051")] - pub control_port: u16, - - #[clap(name = "tor_auth_password", long, default_value = "")] - pub tor_auth_password: String, - - #[clap(name = "socks port PORT", long, default_value = "9050")] - pub socks_port: u16, - - #[clap(name = "datadir", long, default_value = ".tracker")] - pub datadir: String, -} - -fn parse_proxy_auth(s: &str) -> Result<(String, String), TrackerError> { - let parts: Vec<_> = s.split(':').collect(); - if parts.len() != 2 { - return Err(TrackerError::ParsingError); - } - - let user = parts[0].to_string(); - let passwd = parts[1].to_string(); - - Ok((user, passwd)) -} - -#[derive(Debug, Clone)] -pub struct RPCConfig { - pub url: String, - pub auth: Auth, -} - -const RPC_HOSTPORT: &str = "localhost:18443"; - -impl RPCConfig { - fn new(url: String, auth: Auth) -> Self { - RPCConfig { url, auth } - } -} - -impl Default for RPCConfig { - fn default() -> Self { - Self { - url: RPC_HOSTPORT.to_string(), - auth: Auth::UserPass("regtestrpcuser".to_string(), "regtestrpcpass".to_string()), - } - } -} - -impl From for Client { - fn from(value: RPCConfig) -> Self { - Client::new(&value.url, value.auth.clone()).unwrap() - } -} - -use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; - -pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); - -fn run_migrations(pool: Arc>>) { - let mut conn = pool - .get() - .expect("Failed to get DB connection for migrations"); - conn.run_pending_migrations(MIGRATIONS) - .expect("Migration failed"); + #[clap(long, short = 'r', default_value = "127.0.0.1:18443")] + rpc: String, + #[clap(short = 'a', long, default_value = "username:password")] + auth: String, + #[clap(short = 's', long, default_value = "127.0.0.1:8080")] + address: String, + #[clap(short = 'c', long, default_value = "9051")] + control_port: u16, + #[clap(long, default_value = "")] + tor_auth_password: String, + #[clap(long, default_value = "9050")] + socks_port: u16, + #[clap(long, default_value = ".tracker")] + datadir: String, } #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); - let args = App::parse(); - let rpc_config = RPCConfig::new(args.rpc, Auth::UserPass(args.auth.0, args.auth.1)); - - info!("Connecting to indexer db"); - let database_url = format!("{}/tracker.db", args.datadir); - if let Some(parent) = Path::new(&database_url).parent() { - std::fs::create_dir_all(parent).expect("Failed to create database directory"); - } - let manager = ConnectionManager::::new(database_url); - let pool = Arc::new( - Pool::builder() - .build(manager) - .expect("Failed to create DB pool"), - ); - run_migrations(pool.clone()); - info!("Connected to indexer db"); - - check_tor_status(args.control_port, &args.tor_auth_password) - .await - .expect("Failed to check Tor status"); - - let hostname = match args.address.split_once(':') { - Some((_, port)) => { - let port = port.parse::().expect("Invalid port in address"); - get_tor_hostname( - Path::new(&args.datadir), - args.control_port, - port, - &args.tor_auth_password, - ) - .await - .expect("Failed to retrieve Tor hostname") - } - None => { - error!("Invalid address format. Expected format: :"); - return; - } + let (user, pass) = { + let parts: Vec<_> = args.auth.split(':').collect(); + (parts[0].to_string(), parts[1].to_string()) }; - info!("Tracker is listening at {}", hostname); - - let (mut db_tx, db_rx) = mpsc::channel::(10); - let (status_tx, mut status_rx) = mpsc::channel::(10); - - let server_address = args.address.clone(); - - spawn_db_manager(pool.clone(), db_rx, status_tx.clone()).await; - spawn_mempool_indexer( - pool.clone(), - db_tx.clone(), - status_tx.clone(), - rpc_config.clone().into(), - ) - .await; - spawn_server( - db_tx.clone(), - status_tx.clone(), - server_address.clone(), - args.socks_port, - hostname.clone(), - ) - .await; - - info!("Tracker started"); - - while let Some(status) = status_rx.recv().await { - match status.state { - State::DBShutdown(err) => { - warn!( - "DB Manager exited unexpectedly. Restarting... Error: {:?}", - err - ); - let (new_db_tx, new_db_rx) = mpsc::channel::(10); - db_tx = new_db_tx; - spawn_db_manager(pool.clone(), new_db_rx, status_tx.clone()).await; - } - State::Healthy(info) => { - info!("System healthy: {:?}", info); - } - State::MempoolShutdown(err) => { - warn!("Mempool Indexer crashed. Restarting... Error: {:?}", err); - let client: Client = rpc_config.clone().into(); - spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), client).await; - } - State::ServerShutdown(err) => { - warn!("Server crashed. Restarting... Error: {:?}", err); - spawn_server( - db_tx.clone(), - status_tx.clone(), - server_address.clone(), - args.socks_port, - hostname.clone(), - ) - .await; - } - } - } -} - -async fn spawn_db_manager( - pool: Arc>>, - db_tx: Receiver, - status_tx: Sender, -) { - info!("Spawning db manager"); - tokio::spawn(db::run(pool, db_tx, status::Sender::DBManager(status_tx))); -} - -async fn spawn_mempool_indexer( - pool: Arc>>, - db_tx: Sender, - status_tx: Sender, - client: Client, -) { - info!("Spawning indexer"); - tokio::spawn(indexer::run( - pool, - db_tx, - status::Sender::Mempool(status_tx), - client.into(), - )); -} + let cfg = Config { + rpc_url: args.rpc, + rpc_auth: Auth::UserPass(user, pass), + address: args.address, + control_port: args.control_port, + tor_auth_password: args.tor_auth_password, + socks_port: args.socks_port, + datadir: args.datadir, + }; -async fn spawn_server( - db_tx: Sender, - status_tx: Sender, - address: String, - socks_port: u16, - hostname: String, -) { - info!("Spawning server instance"); - tokio::spawn(server::run( - db_tx, - status::Sender::Server(status_tx), - address, - socks_port, - hostname, - )); + start(cfg).await; }