diff --git a/.gitignore b/.gitignore index f687a5b..0d96e05 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .ra-target -.tracker \ No newline at end of file +.tracker +tracker.db \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 959d9dc..9a83cb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -209,6 +224,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + [[package]] name = "bytes" version = "1.10.1" @@ -230,6 +251,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "clap" version = "4.5.37" @@ -276,12 +312,134 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + +[[package]] +name = "diesel" +version = "2.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "229850a212cd9b84d4f0290ad9d294afc0ae70fccaa8949dbe8b43ffafa1e20c" +dependencies = [ + "chrono", + "diesel_derives", + "libsqlite3-sys", + "r2d2", + "time", +] + +[[package]] +name = "diesel_derives" +version = "2.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b96984c469425cb577bf6f17121ecb3e4fe1e81de5d8f780dd372802858d756" +dependencies = [ + "diesel_table_macro_syntax", + "dsl_auto_type", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "diesel_migrations" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a73ce704bad4231f001bff3314d91dce4aba0770cee8b233991859abc15c1f6" +dependencies = [ + "diesel", + "migrations_internals", + "migrations_macros", +] + +[[package]] +name = "diesel_table_macro_syntax" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "209c735641a413bc68c4923a9d6ad4bcb3ca306b794edaa7eb0b3228a99ffb25" +dependencies = [ + "syn", +] + +[[package]] +name = "dsl_auto_type" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ae9aca7527f85f26dd76483eb38533fd84bd571065da1739656ef71c5ff5b" +dependencies = [ + "darling", + "either", + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures-core" version = "0.3.31" @@ -348,6 +506,12 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" + [[package]] name = "heck" version = "0.5.0" @@ -375,6 +539,46 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3011d1213f159867b13cfd6ac92d2cd5f1345762c63be3554e84092d85a50bbd" +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + +[[package]] +name = "indexmap" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -387,6 +591,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "jsonrpc" version = "0.18.0" @@ -411,6 +625,16 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "libsqlite3-sys" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" +dependencies = [ + "pkg-config", + "vcpkg", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -458,6 +682,27 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "migrations_internals" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bda1634d70d5bd53553cf15dca9842a396e8c799982a3ad22998dc44d961f24" +dependencies = [ + "serde", + "toml", +] + +[[package]] +name = "migrations_macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb161cc72176cb37aa47f1fc520d3ef02263d67d661f44f05d05a079e1237fd" +dependencies = [ + "migrations_internals", + "proc-macro2", + "quote", +] + [[package]] name = "miniz_oxide" version = "0.8.8" @@ -499,6 +744,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -555,6 +815,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -582,6 +854,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.8.5" @@ -683,6 +966,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -758,6 +1050,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40734c41988f7306bb04f0ecf60ec0f3f1caa34290e4e8ea471dcd3346483b83" +dependencies = [ + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -854,6 +1155,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tokio" version = "1.45.0" @@ -921,6 +1253,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed0aee96c12fa71097902e0bb061a5e1ebd766a6636bb605ba401c45c1650eac" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + +[[package]] +name = "toml_datetime" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bade1c3e902f58d73d3f294cd7f20391c1cb2fbcb643b73566bc773971df91e3" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_parser" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97200572db069e74c512a14117b296ba0a80a30123fbbb5aa1f4a348f639ca30" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" + [[package]] name = "tracing" version = "0.1.41" @@ -987,8 +1358,12 @@ name = "tracker" version = "0.1.0" dependencies = [ "bitcoincore-rpc", + "chrono", "clap", + "diesel", + "diesel_migrations", "hex", + "r2d2", "serde", "serde_cbor", "tokio", @@ -1017,12 +1392,76 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1090,6 +1529,12 @@ dependencies = [ "syn", ] +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-result" version = "0.2.0" @@ -1191,6 +1636,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" + [[package]] name = "zerocopy" version = "0.8.25" diff --git a/Cargo.toml b/Cargo.toml index 4b44e6e..bebbbcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,12 @@ edition = "2024" [dependencies] bitcoincore-rpc = "0.19.0" +chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5.37", features = ["derive"] } +diesel = { version = "2.2.12", features = ["chrono", "sqlite", "r2d2"] } +diesel_migrations = "2.2.0" hex = "0.4.3" +r2d2 = "0.8.10" serde = { version = "1.0.219", features = ["derive"] } serde_cbor = "0.11.2" tokio = { version = "1.45.0", features = ["full"] } diff --git a/diesel.toml b/diesel.toml new file mode 100644 index 0000000..0010335 --- /dev/null +++ b/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/db/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] + +[migrations_directory] +dir = "./migrations" diff --git a/migrations/.keep b/migrations/.keep new file mode 100644 index 0000000..e69de29 diff --git a/migrations/2025-07-12-121135_create_servers_table/down.sql b/migrations/2025-07-12-121135_create_servers_table/down.sql new file mode 100644 index 0000000..34a7a47 --- /dev/null +++ b/migrations/2025-07-12-121135_create_servers_table/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` +DROP TABLE servers; +DROP TABLE utxos; +DROP TABLE mempool_tx; +DROP TABLE mempool_inputs; \ No newline at end of file diff --git a/migrations/2025-07-12-121135_create_servers_table/up.sql b/migrations/2025-07-12-121135_create_servers_table/up.sql new file mode 100644 index 0000000..cc063d7 --- /dev/null +++ b/migrations/2025-07-12-121135_create_servers_table/up.sql @@ -0,0 +1,32 @@ +-- Your SQL goes here +CREATE TABLE servers ( + onion_address TEXT PRIMARY KEY, + cooldown_seconds REAL NOT NULL, + stale BOOLEAN NOT NULL +); + +CREATE TABLE utxos ( + txid TEXT NOT NULL, + vout INTEGER NOT NULL, + value INTEGER NOT NULL, + script_pubkey TEXT NOT NULL, + confirmed BOOLEAN NOT NULL DEFAULT true, + spent BOOLEAN NOT NULL DEFAULT false, + spent_by_txid TEXT, + block_height INTEGER, + PRIMARY KEY (txid, vout) +); + + +CREATE TABLE mempool_tx ( + txid TEXT PRIMARY KEY NOT NULL, + seen_at TIMESTAMP NOT NULL +); + +CREATE TABLE mempool_inputs ( + txid TEXT NOT NULL, + input_txid TEXT NOT NULL, + input_vout INTEGER NOT NULL, + FOREIGN KEY(txid) REFERENCES mempool_tx(txid), + FOREIGN KEY(input_txid, input_vout) REFERENCES utxos(txid, vout) +); \ No newline at end of file diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index 55ab9d7..f69c5ec 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -1,15 +1,25 @@ -use std::collections::HashMap; +use crate::db::model::MempoolTx; +use diesel::RunQueryDsl; +use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SqliteConnection, r2d2::ConnectionManager}; +use r2d2::Pool; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc::Receiver; use tracing::info; use crate::{ + db::schema::{mempool_inputs, mempool_tx}, error::TrackerError, status::{self, Status}, types::{DbRequest, ServerInfo}, }; -pub async fn run(mut rx: Receiver, status_tx: status::Sender) { +pub async fn run( + pool: Arc>>, + mut rx: Receiver, + status_tx: status::Sender, +) { let mut servers: HashMap = HashMap::new(); + let mut conn = pool.get().unwrap(); info!("DB manager started"); while let Some(request) = rx.recv().await { match request { @@ -41,6 +51,19 @@ pub async fn run(mut rx: Receiver, status_tx: status::Sender) { .collect(); let _ = resp_tx.send(response).await; } + DbRequest::WatchUtxo(outpoint, resp_tx) => { + info!("Watch utxo intercepted"); + + let mempool_tx = mempool_tx::table + .inner_join(mempool_inputs::table.on(mempool_tx::txid.eq(mempool_inputs::txid))) + .filter(mempool_inputs::input_txid.eq(outpoint.txid.to_string())) + .filter(mempool_inputs::input_vout.eq(outpoint.vout as i32)) + .select((mempool_tx::txid, mempool_tx::seen_at)) + .load::(&mut conn) + .unwrap(); + + let _ = resp_tx.send(mempool_tx).await; + } } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 5125d19..ed1a281 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,2 +1,4 @@ mod db_manager; pub use db_manager::run; +pub mod model; +pub mod schema; diff --git a/src/db/model.rs b/src/db/model.rs new file mode 100644 index 0000000..db357f3 --- /dev/null +++ b/src/db/model.rs @@ -0,0 +1,38 @@ +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::servers)] +pub struct Server { + pub onion_address: String, + pub cooldown_seconds: f32, + pub stale: bool, +} + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::utxos)] +pub struct Utxo { + pub txid: String, + pub vout: i32, + pub value: i32, + pub script_pubkey: String, + pub confirmed: bool, + pub spent: bool, + pub spent_by_txid: Option, + pub block_height: Option, +} + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::mempool_tx)] +pub struct MempoolTx { + pub txid: String, + pub seen_at: chrono::NaiveDateTime, +} + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::mempool_inputs)] +pub struct MempoolInput { + pub txid: String, + pub input_txid: String, + pub input_vout: i32, +} diff --git a/src/db/schema.rs b/src/db/schema.rs new file mode 100644 index 0000000..8aab1d9 --- /dev/null +++ b/src/db/schema.rs @@ -0,0 +1,42 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + mempool_inputs (rowid) { + rowid -> Integer, + txid -> Text, + input_txid -> Text, + input_vout -> Integer, + } +} + +diesel::table! { + mempool_tx (txid) { + txid -> Text, + seen_at -> Timestamp, + } +} + +diesel::table! { + servers (onion_address) { + onion_address -> Nullable, + cooldown_seconds -> Float, + stale -> Bool, + } +} + +diesel::table! { + utxos (txid, vout) { + txid -> Text, + vout -> Integer, + value -> Integer, + script_pubkey -> Text, + confirmed -> Bool, + spent -> Bool, + spent_by_txid -> Nullable, + block_height -> Nullable, + } +} + +diesel::joinable!(mempool_inputs -> mempool_tx (txid)); + +diesel::allow_tables_to_appear_in_same_query!(mempool_inputs, mempool_tx, servers, utxos,); diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index fac239c..940c4d0 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,3 +1,4 @@ mod tracker_indexer; pub use tracker_indexer::run; mod rpc; +mod utxo_indexer; diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index 58b1541..c1b5aff 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -1,5 +1,7 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; +use diesel::{SqliteConnection, r2d2::ConnectionManager}; +use r2d2::Pool; use tokio::{sync::mpsc::Sender, time::Instant}; use bitcoincore_rpc::bitcoin::absolute::{Height, LockTime}; @@ -7,18 +9,28 @@ use tracing::info; use super::rpc::BitcoinRpc; use crate::{ - handle_result, status, + handle_result, + indexer::utxo_indexer::Indexer, + status, types::{DbRequest, ServerInfo}, }; -pub async fn run(db_tx: Sender, status_tx: status::Sender, client: BitcoinRpc) { +pub async fn run( + pool: Arc>>, + db_tx: Sender, + status_tx: status::Sender, + client: BitcoinRpc, +) { info!("Indexer started"); + let mut utxo_indexer = Indexer::new(pool, &client); let mut last_tip = 0; loop { let blockchain_info = handle_result!(status_tx, client.get_blockchain_info()); let tip_height = blockchain_info.blocks + 1; + utxo_indexer.process_mempool(); for height in last_tip..tip_height { + utxo_indexer.process_block(height); let block_hash = handle_result!(status_tx, client.get_block_hash(height)); let block = handle_result!(status_tx, client.get_block(block_hash)); for tx in block.txdata { diff --git a/src/indexer/utxo_indexer.rs b/src/indexer/utxo_indexer.rs new file mode 100644 index 0000000..c28beb5 --- /dev/null +++ b/src/indexer/utxo_indexer.rs @@ -0,0 +1,136 @@ +use std::sync::Arc; + +use crate::db::model::{MempoolInput, MempoolTx, Utxo}; +use crate::db::schema::{mempool_inputs, mempool_tx, utxos}; +use crate::indexer::rpc::BitcoinRpc; +use chrono::NaiveDateTime; +use diesel::SqliteConnection; +use diesel::prelude::*; +use diesel::r2d2::ConnectionManager; +use r2d2::Pool; + +pub struct Indexer<'a> { + conn: Arc>>, + rpc: &'a BitcoinRpc, +} + +impl<'a> Indexer<'a> { + pub fn new(conn: Arc>>, rpc: &'a BitcoinRpc) -> Self { + Self { conn, rpc } + } + + pub fn process_mempool(&mut self) { + let txids = self.rpc.get_raw_mempool().unwrap(); + let mut conn = self.conn.get().expect("Failed to get DB connection"); + + for txid in txids { + let tx = self.rpc.get_raw_tx(&txid).unwrap(); + + self.insert_mempool_tx(&txid.to_string()); + + for input in &tx.input { + let prevout = &input.previous_output; + diesel::insert_into(mempool_inputs::table) + .values(&MempoolInput { + txid: txid.to_string(), + input_txid: prevout.txid.to_string(), + input_vout: prevout.vout as i32, + }) + .execute(&mut conn) + .unwrap(); + + self.mark_utxo_spent( + &txid.to_string(), + prevout.vout as i32, + Some(&txid.to_string()), + false, + ); + } + + for (vout, out) in tx.output.iter().enumerate() { + let utxo = Utxo { + txid: txid.to_string().clone(), + vout: vout as i32, + value: out.value.to_sat() as i32, + script_pubkey: out.script_pubkey.to_hex_string(), + confirmed: false, + spent: false, + spent_by_txid: None, + block_height: None, + }; + diesel::insert_or_ignore_into(utxos::table) + .values(&utxo) + .execute(&mut conn) + .unwrap(); + } + } + } + + pub fn process_block(&mut self, height: u64) { + let block_hash = self.rpc.get_block_hash(height).unwrap(); + let block = self.rpc.get_block(block_hash).unwrap(); + let mut conn = self.conn.get().expect("Failed to get DB connection"); + + for tx in block.txdata.iter() { + for input in &tx.input { + let prevout = &input.previous_output; + self.mark_utxo_spent( + &prevout.txid.to_string(), + prevout.vout as i32, + Some(&tx.compute_txid().to_string()), + true, + ); + } + + for (vout, out) in tx.output.iter().enumerate() { + let utxo = Utxo { + txid: tx.compute_txid().to_string(), + vout: vout as i32, + value: out.value.to_sat() as i32, + script_pubkey: out.script_pubkey.to_hex_string(), + confirmed: true, + spent: false, + spent_by_txid: None, + block_height: Some(height as i32), + }; + diesel::insert_or_ignore_into(utxos::table) + .values(&utxo) + .execute(&mut conn) + .unwrap(); + } + } + } + + fn insert_mempool_tx(&mut self, txid: &str) { + let mut conn = self.conn.get().expect("Failed to get DB connection"); + diesel::insert_or_ignore_into(mempool_tx::table) + .values(&MempoolTx { + txid: txid.to_string(), + seen_at: NaiveDateTime::MIN, + }) + .execute(&mut conn) + .unwrap(); + } + + fn mark_utxo_spent( + &mut self, + txid: &str, + vout: i32, + spent_by: Option<&str>, + is_confirmed_spend: bool, + ) { + use utxos::dsl; + let mut conn = self.conn.get().expect("Failed to get DB connection"); + + let confirmed_value = is_confirmed_spend; + + diesel::update(dsl::utxos.filter(dsl::txid.eq(txid).and(dsl::vout.eq(vout)))) + .set(( + dsl::spent.eq(true), + dsl::spent_by_txid.eq(spent_by.map(str::to_string)), + dsl::confirmed.eq(confirmed_value), + )) + .execute(&mut conn) + .unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index c6803ca..c0c6ebc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,14 @@ #![allow(dead_code)] use std::path::Path; +use std::sync::Arc; use bitcoincore_rpc::Auth; use bitcoincore_rpc::Client; use clap::Parser; +use diesel::SqliteConnection; +use diesel::r2d2::ConnectionManager; use error::TrackerError; +use r2d2::Pool; use status::{State, Status}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tor::check_tor_status; @@ -30,7 +34,7 @@ struct App { name = "ADDRESS:PORT", long, short = 'r', - default_value = "127.0.0.1:48332" + default_value = "127.0.0.1:18443" )] pub(crate) rpc: String, #[clap( @@ -103,6 +107,18 @@ impl From for Client { } } +use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; + +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + +fn run_migrations(pool: Arc>>) { + let mut conn = pool + .get() + .expect("Failed to get DB connection for migrations"); + conn.run_pending_migrations(MIGRATIONS) + .expect("Migration failed"); +} + #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); @@ -111,6 +127,20 @@ async fn main() { let rpc_config = RPCConfig::new(args.rpc, Auth::UserPass(args.auth.0, args.auth.1)); + info!("Connecting to indexer db"); + let database_url = format!("{}/tracker.db", args.datadir); + if let Some(parent) = Path::new(&database_url).parent() { + std::fs::create_dir_all(parent).expect("Failed to create database directory"); + } + let manager = ConnectionManager::::new(database_url); + let pool = Arc::new( + Pool::builder() + .build(manager) + .expect("Failed to create DB pool"), + ); + run_migrations(pool.clone()); + info!("Connected to indexer db"); + check_tor_status(args.control_port, &args.tor_auth_password) .await .expect("Failed to check Tor status"); @@ -140,13 +170,20 @@ async fn main() { let server_address = args.address.clone(); - spawn_db_manager(db_rx, status_tx.clone()).await; - spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), rpc_config.clone().into()).await; + spawn_db_manager(pool.clone(), db_rx, status_tx.clone()).await; + spawn_mempool_indexer( + pool.clone(), + db_tx.clone(), + status_tx.clone(), + rpc_config.clone().into(), + ) + .await; spawn_server( db_tx.clone(), status_tx.clone(), server_address.clone(), args.socks_port, + hostname.clone(), ) .await; @@ -161,7 +198,7 @@ async fn main() { ); let (new_db_tx, new_db_rx) = mpsc::channel::(10); db_tx = new_db_tx; - spawn_db_manager(new_db_rx, status_tx.clone()).await; + spawn_db_manager(pool.clone(), new_db_rx, status_tx.clone()).await; } State::Healthy(info) => { info!("System healthy: {:?}", info); @@ -169,7 +206,7 @@ async fn main() { State::MempoolShutdown(err) => { warn!("Mempool Indexer crashed. Restarting... Error: {:?}", err); let client: Client = rpc_config.clone().into(); - spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), client).await; + spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), client).await; } State::ServerShutdown(err) => { warn!("Server crashed. Restarting... Error: {:?}", err); @@ -178,6 +215,7 @@ async fn main() { status_tx.clone(), server_address.clone(), args.socks_port, + hostname.clone(), ) .await; } @@ -185,18 +223,24 @@ async fn main() { } } -async fn spawn_db_manager(db_tx: Receiver, status_tx: Sender) { +async fn spawn_db_manager( + pool: Arc>>, + db_tx: Receiver, + status_tx: Sender, +) { info!("Spawning db manager"); - tokio::spawn(db::run(db_tx, status::Sender::DBManager(status_tx))); + tokio::spawn(db::run(pool, db_tx, status::Sender::DBManager(status_tx))); } async fn spawn_mempool_indexer( + pool: Arc>>, db_tx: Sender, status_tx: Sender, client: Client, ) { info!("Spawning indexer"); tokio::spawn(indexer::run( + pool, db_tx, status::Sender::Mempool(status_tx), client.into(), @@ -208,6 +252,7 @@ async fn spawn_server( status_tx: Sender, address: String, socks_port: u16, + hostname: String, ) { info!("Spawning server instance"); tokio::spawn(server::run( @@ -215,5 +260,6 @@ async fn spawn_server( status::Sender::Server(status_tx), address, socks_port, + hostname, )); } diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs index cbbf3a1..9719045 100644 --- a/src/server/tracker_monitor.rs +++ b/src/server/tracker_monitor.rs @@ -13,7 +13,7 @@ use crate::{ handle_result, server::send_message_with_prefix, status, - types::{DbRequest, ServerInfo, TrackerRequest, TrackerResponse}, + types::{DbRequest, ServerInfo, TrackerClientToServer, TrackerServerToClient}, utils::read_message, }; @@ -24,6 +24,8 @@ pub async fn monitor_systems( db_tx: Sender, status_tx: status::Sender, socks_port: u16, + onion_address: String, + port: u16, ) -> Result<(), TrackerError> { info!("Starting to monitor other maker services"); @@ -59,11 +61,14 @@ pub async fn monitor_systems( let mut writer = BufWriter::new(write_half); - let message = TrackerResponse::Ping; + let message = TrackerServerToClient::Ping { + address: onion_address.clone(), + port, + }; _ = send_message_with_prefix(&mut writer, &message).await; let buffer = handle_result!(status_tx, read_message(&mut reader).await); - let response: TrackerRequest = + let response: TrackerClientToServer = match serde_cbor::de::from_reader(&buffer[..]) { Ok(resp) => resp, Err(e) => { @@ -73,7 +78,7 @@ pub async fn monitor_systems( } }; - if let TrackerRequest::Pong { address } = response { + if let TrackerClientToServer::Pong { address } = response { let updated_info = ServerInfo { onion_address: address.clone(), cooldown: Instant::now(), diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 367310c..37c8a8d 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -1,8 +1,9 @@ +use crate::db::model::MempoolTx; use crate::server::tracker_monitor::monitor_systems; use crate::status; use crate::types::DbRequest; -use crate::types::TrackerRequest; -use crate::types::TrackerResponse; +use crate::types::TrackerClientToServer; +use crate::types::TrackerServerToClient; use crate::utils::read_message; use crate::utils::send_message; use tokio::io::BufReader; @@ -19,13 +20,20 @@ pub async fn run( status_tx: status::Sender, address: String, socks_port: u16, + onion_address: String, ) -> Result<(), Box> { + let port = address + .rsplit_once(':') + .and_then(|(_, port)| port.parse::().ok()) + .unwrap_or(8080); let server = TcpListener::bind(&address).await?; tokio::spawn(monitor_systems( db_tx.clone(), status_tx.clone(), socks_port, + onion_address, + port, )); info!("Tracker server listening on {}", address); @@ -56,7 +64,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { } }; - let request: TrackerRequest = match serde_cbor::de::from_reader(&buffer[..]) { + let request: TrackerClientToServer = match serde_cbor::de::from_reader(&buffer[..]) { Ok(r) => r, Err(e) => { error!("Failed to deserialize client request: {e}"); @@ -65,7 +73,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { }; match request { - TrackerRequest::Get => { + TrackerClientToServer::Get => { info!("Received Get request taker"); let (resp_tx, mut resp_rx) = mpsc::channel(1); let db_request = DbRequest::QueryActive(resp_tx); @@ -79,7 +87,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { info!("Response: {:?}", response); if let Some(addresses) = response { - let message = TrackerResponse::Address { addresses }; + let message = TrackerServerToClient::Address { addresses }; if let Err(e) = send_message(&mut writer, &message).await { error!("Failed to send response to client: {e}"); break; @@ -87,13 +95,36 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { } } - TrackerRequest::Post { metadata: _ } => { + TrackerClientToServer::Post { metadata: _ } => { todo!() } - TrackerRequest::Pong { address: _ } => { + TrackerClientToServer::Pong { address: _ } => { todo!() } + TrackerClientToServer::Watch { outpoint } => { + info!("Received a watch request from client: {outpoint:?}"); + + let (resp_tx, mut resp_rx) = mpsc::channel::>(1); + + let db_request = DbRequest::WatchUtxo(outpoint, resp_tx); + + if let Err(e) = db_tx.send(db_request).await { + error!("Failed to send DB request: {e}"); + break; + } + + let response = resp_rx.recv().await; + info!("Response: {:?}", response); + + if let Some(mempool_tx) = response { + let message = TrackerServerToClient::WatchResponse { mempool_tx }; + if let Err(e) = send_message(&mut writer, &message).await { + error!("Failed to send response to client: {e}"); + break; + } + } + } } } diff --git a/src/types.rs b/src/types.rs index 9ce9060..13fbd26 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,6 +5,8 @@ use bitcoincore_rpc::bitcoin::{ use serde::{Deserialize, Serialize}; use tokio::{sync::mpsc::Sender, time::Instant}; +use crate::db::model::MempoolTx; + #[derive(Debug, Clone)] pub struct ServerInfo { pub onion_address: String, @@ -18,6 +20,7 @@ pub enum DbRequest { Update(String, ServerInfo), QueryAll(Sender>), QueryActive(Sender>), + WatchUtxo(OutPoint, Sender>), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Hash)] @@ -54,7 +57,7 @@ pub struct DnsMetadata { #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] -pub enum TrackerRequest { +pub enum TrackerClientToServer { /// A request sent by the maker to register itself with the DNS server and authenticate. Post { /// Metadata containing the maker's URL and fidelity proof. @@ -63,11 +66,17 @@ pub enum TrackerRequest { /// A request sent by the taker to fetch all valid maker addresses from the DNS server. Get, /// To gauge server activity - Pong { address: String }, + Pong { + address: String, + }, + Watch { + outpoint: OutPoint, + }, } #[derive(Serialize, Deserialize, Debug)] -pub enum TrackerResponse { +pub enum TrackerServerToClient { Address { addresses: Vec }, - Ping, + Ping { address: String, port: u16 }, + WatchResponse { mempool_tx: Vec }, }