From 46537a36632e7ae2595d09d70ca31a3eea8c9961 Mon Sep 17 00:00:00 2001 From: Wukong Date: Sat, 12 Jul 2025 18:01:59 +0530 Subject: [PATCH 1/8] add diesel and create migrations --- Cargo.lock | 316 ++++++++++++++++++ Cargo.toml | 1 + diesel.toml | 9 + migrations/.keep | 0 .../down.sql | 5 + .../up.sql | 32 ++ src/db/schema.rs | 47 +++ tracker.db | Bin 0 -> 40960 bytes 8 files changed, 410 insertions(+) create mode 100644 diesel.toml create mode 100644 migrations/.keep create mode 100644 migrations/2025-07-12-121135_create_servers_table/down.sql create mode 100644 migrations/2025-07-12-121135_create_servers_table/up.sql create mode 100644 src/db/schema.rs create mode 100644 tracker.db diff --git a/Cargo.lock b/Cargo.lock index 959d9dc..69bf9a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -209,6 +224,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + [[package]] name = "bytes" version = "1.10.1" @@ -230,6 +251,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "windows-link", +] + [[package]] name = "clap" version = "4.5.37" @@ -276,12 +309,116 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + +[[package]] +name = "diesel" +version = "2.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "229850a212cd9b84d4f0290ad9d294afc0ae70fccaa8949dbe8b43ffafa1e20c" +dependencies = [ + "chrono", + "diesel_derives", + "libsqlite3-sys", + "time", +] + +[[package]] +name = "diesel_derives" +version = "2.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b96984c469425cb577bf6f17121ecb3e4fe1e81de5d8f780dd372802858d756" +dependencies = [ + "diesel_table_macro_syntax", + "dsl_auto_type", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "diesel_table_macro_syntax" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "209c735641a413bc68c4923a9d6ad4bcb3ca306b794edaa7eb0b3228a99ffb25" +dependencies = [ + "syn", +] + +[[package]] +name = "dsl_auto_type" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ae9aca7527f85f26dd76483eb38533fd84bd571065da1739656ef71c5ff5b" +dependencies = [ + "darling", + "either", + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures-core" version = "0.3.31" @@ -375,6 +512,36 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3011d1213f159867b13cfd6ac92d2cd5f1345762c63be3554e84092d85a50bbd" +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -387,6 +554,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "jsonrpc" version = "0.18.0" @@ -411,6 +588,16 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "libsqlite3-sys" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" +dependencies = [ + "pkg-config", + "vcpkg", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -499,6 +686,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -555,6 +757,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -854,6 +1068,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tokio" version = "1.45.0" @@ -988,6 +1233,7 @@ version = "0.1.0" dependencies = [ "bitcoincore-rpc", "clap", + "diesel", "hex", "serde", "serde_cbor", @@ -1017,12 +1263,76 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1090,6 +1400,12 @@ dependencies = [ "syn", ] +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-result" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 4b44e6e..cb08500 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] bitcoincore-rpc = "0.19.0" clap = { version = "4.5.37", features = ["derive"] } +diesel = { version = "2.2.12", features = ["chrono", "sqlite"] } hex = "0.4.3" serde = { version = "1.0.219", features = ["derive"] } serde_cbor = "0.11.2" diff --git a/diesel.toml b/diesel.toml new file mode 100644 index 0000000..0010335 --- /dev/null +++ b/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/db/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] + +[migrations_directory] +dir = "./migrations" diff --git a/migrations/.keep b/migrations/.keep new file mode 100644 index 0000000..e69de29 diff --git a/migrations/2025-07-12-121135_create_servers_table/down.sql b/migrations/2025-07-12-121135_create_servers_table/down.sql new file mode 100644 index 0000000..34a7a47 --- /dev/null +++ b/migrations/2025-07-12-121135_create_servers_table/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` +DROP TABLE servers; +DROP TABLE utxos; +DROP TABLE mempool_tx; +DROP TABLE mempool_inputs; \ No newline at end of file diff --git a/migrations/2025-07-12-121135_create_servers_table/up.sql b/migrations/2025-07-12-121135_create_servers_table/up.sql new file mode 100644 index 0000000..1c33a6d --- /dev/null +++ b/migrations/2025-07-12-121135_create_servers_table/up.sql @@ -0,0 +1,32 @@ +-- Your SQL goes here +CREATE TABLE servers ( + onion_address TEXT PRIMARY KEY, + cooldown_seconds REAL NOT NULL, + stale BOOLEAN NOT NULL +); + +CREATE TABLE utxos ( + txid TEXT NOT NULL, + vout INTEGER NOT NULL, + value INTEGER NOT NULL, + script_pubkey TEXT NOT NULL, + confirmed BOOLEAN NOT NULL DEFAULT true, + spent BOOLEAN NOT NULL DEFAULT false, + spent_by_txid TEXT, + block_height INTEGER, + PRIMARY KEY (txid, vout) +); + + +CREATE TABLE mempool_tx ( + txid TEXT PRIMARY KEY, + seen_at TIMESTAMP NOT NULL +); + +CREATE TABLE mempool_inputs ( + txid TEXT NOT NULL, + input_txid TEXT NOT NULL, + input_vout INTEGER NOT NULL, + FOREIGN KEY(txid) REFERENCES mempool_tx(txid), + FOREIGN KEY(input_txid, input_vout) REFERENCES utxos(txid, vout) +); \ No newline at end of file diff --git a/src/db/schema.rs b/src/db/schema.rs new file mode 100644 index 0000000..8773350 --- /dev/null +++ b/src/db/schema.rs @@ -0,0 +1,47 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + mempool_inputs (rowid) { + rowid -> Integer, + txid -> Text, + input_txid -> Text, + input_vout -> Integer, + } +} + +diesel::table! { + mempool_tx (txid) { + txid -> Nullable, + seen_at -> Timestamp, + } +} + +diesel::table! { + servers (onion_address) { + onion_address -> Nullable, + cooldown_seconds -> Float, + stale -> Bool, + } +} + +diesel::table! { + utxos (txid, vout) { + txid -> Text, + vout -> Integer, + value -> Integer, + script_pubkey -> Text, + confirmed -> Bool, + spent -> Bool, + spent_by_txid -> Nullable, + block_height -> Nullable, + } +} + +diesel::joinable!(mempool_inputs -> mempool_tx (txid)); + +diesel::allow_tables_to_appear_in_same_query!( + mempool_inputs, + mempool_tx, + servers, + utxos, +); diff --git a/tracker.db b/tracker.db new file mode 100644 index 0000000000000000000000000000000000000000..e8e1f9c397659433f3e99a8a8f695a80fe17c71b GIT binary patch literal 40960 zcmeI(zf$8y9Kdl2gAL9BmnJG`mYEc21~S+j!{joViOvy(#F&dPxj)U0u@}&sZ8?%m zd?n=it5fG??gi4M%mbuOnYC<;b)2q97teRG&`P_Ke*4+AX2hbCAFF<>#CZ^Q8nM_h z9vY@;d?SQm7;}0l>g6^q=*;}>hF+Uf`K;`m@x`<6mskHWmWuC;m3OPZuNW)8J$SqP z$0vV1c&T@h5I_I{1Q0*~0R#~EPXagAQo*uq^H)D^oV8U)b$UV2mVUR_kE8TfvEsQ- z-4%7`N!1nUGOiIOCF<_;x~LuN<+NJeO!g1`NU5&W&kgm%Blo2496g;hxM=Sz z=B&Fb)=*K~_E$^-_OLzIDo617LvMlqBN}^4gll;o3VoY}33oqSQ zAL)myM!T;v`Y39KelM22{@E+_`a@%&8FbJ6u%lYy$?YX{9&@>2O1my=;NDw*mHk?wLkpSQlYvgi{$SxQ!g z>(6opYi-TE$tB>C3a?ZcjaTz2^zqIyd(ia*-S@3lsG?}(G!@V&!lwQTwt`=}GE!Qt z73tybR3}_xp}5gj_k@b})_mS_*0S0#jzyNnnT6};X2EhC^X99h9a6S@6{)t2nwP56 zkRAUbY{WX^D7$1X)!OWOV*q`d7*gqTik}>>@}1*t>~7m)3gNw$>I1@lSL*urzb=!Z zQ#tkY{h%%T;95HvLv$Ol%?#loW98}4sW!}dNEi3H1vmr00IagfB*srAb|EkA@Tc=IY;u{*Vwr z009ILKmY**5I_I{1Q0-ACImjs*-Pm^2@L1|^*{ZS5I_I{1Q0*~0R#|0009ILKwyRi zk{A4W{-5C{Y&c*KmY**5I_I{1Q0*~0R#{j3h@3v;{X8!5I_I{1Q0*~0R#|00D;*TVE%vh v` Date: Mon, 21 Jul 2025 10:25:20 +0530 Subject: [PATCH 2/8] tracker.db in the .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f687a5b..0d96e05 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .ra-target -.tracker \ No newline at end of file +.tracker +tracker.db \ No newline at end of file From d51d62e2f77c067879a87fc677e374ef4e57a162 Mon Sep 17 00:00:00 2001 From: Wukong Date: Mon, 21 Jul 2025 11:11:39 +0530 Subject: [PATCH 3/8] add db connection state and pass to db_manager --- Cargo.lock | 26 ++++++++++++++++++++++++++ Cargo.toml | 4 +++- src/db/db_manager.rs | 6 ++++-- src/db/mod.rs | 1 + src/main.rs | 21 ++++++++++++++++----- 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69bf9a0..81d838d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -259,7 +259,10 @@ checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "serde", + "wasm-bindgen", "windows-link", ] @@ -368,6 +371,7 @@ dependencies = [ "chrono", "diesel_derives", "libsqlite3-sys", + "r2d2", "time", ] @@ -796,6 +800,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.8.5" @@ -897,6 +912,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -1232,9 +1256,11 @@ name = "tracker" version = "0.1.0" dependencies = [ "bitcoincore-rpc", + "chrono", "clap", "diesel", "hex", + "r2d2", "serde", "serde_cbor", "tokio", diff --git a/Cargo.toml b/Cargo.toml index cb08500..23e25c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,11 @@ edition = "2024" [dependencies] bitcoincore-rpc = "0.19.0" +chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5.37", features = ["derive"] } -diesel = { version = "2.2.12", features = ["chrono", "sqlite"] } +diesel = { version = "2.2.12", features = ["chrono", "sqlite", "r2d2"] } hex = "0.4.3" +r2d2 = "0.8.10" serde = { version = "1.0.219", features = ["derive"] } serde_cbor = "0.11.2" tokio = { version = "1.45.0", features = ["full"] } diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index 55ab9d7..895723b 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -1,4 +1,6 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; +use diesel::{r2d2::ConnectionManager, SqliteConnection}; +use r2d2::Pool; use tokio::sync::mpsc::Receiver; use tracing::info; @@ -8,7 +10,7 @@ use crate::{ types::{DbRequest, ServerInfo}, }; -pub async fn run(mut rx: Receiver, status_tx: status::Sender) { +pub async fn run(pool: Arc>>, mut rx: Receiver, status_tx: status::Sender) { let mut servers: HashMap = HashMap::new(); info!("DB manager started"); while let Some(request) = rx.recv().await { diff --git a/src/db/mod.rs b/src/db/mod.rs index 5125d19..b6e7ab5 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,2 +1,3 @@ mod db_manager; pub use db_manager::run; +mod schema; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c6803ca..b5b7d52 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,14 @@ #![allow(dead_code)] use std::path::Path; +use std::sync::Arc; use bitcoincore_rpc::Auth; use bitcoincore_rpc::Client; use clap::Parser; +use diesel::r2d2::ConnectionManager; +use diesel::SqliteConnection; use error::TrackerError; +use r2d2::Pool; use status::{State, Status}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tor::check_tor_status; @@ -30,7 +34,7 @@ struct App { name = "ADDRESS:PORT", long, short = 'r', - default_value = "127.0.0.1:48332" + default_value = "127.0.0.1:18443" )] pub(crate) rpc: String, #[clap( @@ -111,6 +115,13 @@ async fn main() { let rpc_config = RPCConfig::new(args.rpc, Auth::UserPass(args.auth.0, args.auth.1)); + info!("Connecting to indexer db"); + let database_url = format!("{}/tracker.db", args.datadir); + let manager = ConnectionManager::::new(database_url); + let pool = Arc::new(Pool::builder().build(manager).expect("Failed to create DB pool")); + info!("Connected to indexer db"); + + check_tor_status(args.control_port, &args.tor_auth_password) .await .expect("Failed to check Tor status"); @@ -140,7 +151,7 @@ async fn main() { let server_address = args.address.clone(); - spawn_db_manager(db_rx, status_tx.clone()).await; + spawn_db_manager(pool.clone(), db_rx, status_tx.clone()).await; spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), rpc_config.clone().into()).await; spawn_server( db_tx.clone(), @@ -161,7 +172,7 @@ async fn main() { ); let (new_db_tx, new_db_rx) = mpsc::channel::(10); db_tx = new_db_tx; - spawn_db_manager(new_db_rx, status_tx.clone()).await; + spawn_db_manager(pool.clone(), new_db_rx, status_tx.clone()).await; } State::Healthy(info) => { info!("System healthy: {:?}", info); @@ -185,9 +196,9 @@ async fn main() { } } -async fn spawn_db_manager(db_tx: Receiver, status_tx: Sender) { +async fn spawn_db_manager(pool: Arc>>,db_tx: Receiver, status_tx: Sender) { info!("Spawning db manager"); - tokio::spawn(db::run(db_tx, status::Sender::DBManager(status_tx))); + tokio::spawn(db::run(pool, db_tx, status::Sender::DBManager(status_tx))); } async fn spawn_mempool_indexer( From 09b17073e761c3f05548f63168639667316f95cc Mon Sep 17 00:00:00 2001 From: Wukong Date: Mon, 21 Jul 2025 11:17:12 +0530 Subject: [PATCH 4/8] add model to db module --- src/db/db_manager.rs | 10 +++++++--- src/db/mod.rs | 3 ++- src/db/model.rs | 38 ++++++++++++++++++++++++++++++++++++++ src/db/schema.rs | 7 +------ src/main.rs | 15 +++++++++++---- 5 files changed, 59 insertions(+), 14 deletions(-) create mode 100644 src/db/model.rs diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index 895723b..4743d53 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -1,6 +1,6 @@ -use std::{collections::HashMap, sync::Arc}; -use diesel::{r2d2::ConnectionManager, SqliteConnection}; +use diesel::{SqliteConnection, r2d2::ConnectionManager}; use r2d2::Pool; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc::Receiver; use tracing::info; @@ -10,7 +10,11 @@ use crate::{ types::{DbRequest, ServerInfo}, }; -pub async fn run(pool: Arc>>, mut rx: Receiver, status_tx: status::Sender) { +pub async fn run( + pool: Arc>>, + mut rx: Receiver, + status_tx: status::Sender, +) { let mut servers: HashMap = HashMap::new(); info!("DB manager started"); while let Some(request) = rx.recv().await { diff --git a/src/db/mod.rs b/src/db/mod.rs index b6e7ab5..a134b60 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,3 +1,4 @@ mod db_manager; pub use db_manager::run; -mod schema; \ No newline at end of file +mod model; +mod schema; diff --git a/src/db/model.rs b/src/db/model.rs new file mode 100644 index 0000000..db357f3 --- /dev/null +++ b/src/db/model.rs @@ -0,0 +1,38 @@ +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::servers)] +pub struct Server { + pub onion_address: String, + pub cooldown_seconds: f32, + pub stale: bool, +} + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::utxos)] +pub struct Utxo { + pub txid: String, + pub vout: i32, + pub value: i32, + pub script_pubkey: String, + pub confirmed: bool, + pub spent: bool, + pub spent_by_txid: Option, + pub block_height: Option, +} + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::mempool_tx)] +pub struct MempoolTx { + pub txid: String, + pub seen_at: chrono::NaiveDateTime, +} + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::mempool_inputs)] +pub struct MempoolInput { + pub txid: String, + pub input_txid: String, + pub input_vout: i32, +} diff --git a/src/db/schema.rs b/src/db/schema.rs index 8773350..4135c0e 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -39,9 +39,4 @@ diesel::table! { diesel::joinable!(mempool_inputs -> mempool_tx (txid)); -diesel::allow_tables_to_appear_in_same_query!( - mempool_inputs, - mempool_tx, - servers, - utxos, -); +diesel::allow_tables_to_appear_in_same_query!(mempool_inputs, mempool_tx, servers, utxos,); diff --git a/src/main.rs b/src/main.rs index b5b7d52..c9d461f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use bitcoincore_rpc::Auth; use bitcoincore_rpc::Client; use clap::Parser; -use diesel::r2d2::ConnectionManager; use diesel::SqliteConnection; +use diesel::r2d2::ConnectionManager; use error::TrackerError; use r2d2::Pool; use status::{State, Status}; @@ -118,10 +118,13 @@ async fn main() { info!("Connecting to indexer db"); let database_url = format!("{}/tracker.db", args.datadir); let manager = ConnectionManager::::new(database_url); - let pool = Arc::new(Pool::builder().build(manager).expect("Failed to create DB pool")); + let pool = Arc::new( + Pool::builder() + .build(manager) + .expect("Failed to create DB pool"), + ); info!("Connected to indexer db"); - check_tor_status(args.control_port, &args.tor_auth_password) .await .expect("Failed to check Tor status"); @@ -196,7 +199,11 @@ async fn main() { } } -async fn spawn_db_manager(pool: Arc>>,db_tx: Receiver, status_tx: Sender) { +async fn spawn_db_manager( + pool: Arc>>, + db_tx: Receiver, + status_tx: Sender, +) { info!("Spawning db manager"); tokio::spawn(db::run(pool, db_tx, status::Sender::DBManager(status_tx))); } From ac8bfd8a64904a354f7de34f6645c9d194034479 Mon Sep 17 00:00:00 2001 From: Wukong Date: Mon, 21 Jul 2025 13:12:49 +0530 Subject: [PATCH 5/8] add indexer --- src/db/mod.rs | 4 +- src/indexer/mod.rs | 1 + src/indexer/utxo_indexer.rs | 115 ++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 src/indexer/utxo_indexer.rs diff --git a/src/db/mod.rs b/src/db/mod.rs index a134b60..ed1a281 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,4 +1,4 @@ mod db_manager; pub use db_manager::run; -mod model; -mod schema; +pub mod model; +pub mod schema; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index fac239c..b6d48c2 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,3 +1,4 @@ mod tracker_indexer; pub use tracker_indexer::run; mod rpc; +mod utxo_indexer; \ No newline at end of file diff --git a/src/indexer/utxo_indexer.rs b/src/indexer/utxo_indexer.rs new file mode 100644 index 0000000..6dd4f88 --- /dev/null +++ b/src/indexer/utxo_indexer.rs @@ -0,0 +1,115 @@ +use chrono::NaiveDateTime; +use diesel::prelude::*; +use diesel::SqliteConnection; +use crate::db::schema::{mempool_inputs, mempool_tx, utxos}; +use crate::db::model::{MempoolInput, MempoolTx, Utxo}; +use crate::indexer::rpc::BitcoinRpc; + +pub struct Indexer<'a> { + conn: &'a mut SqliteConnection, + rpc: &'a BitcoinRpc, +} + +impl<'a> Indexer<'a> { + pub fn new(conn: &'a mut SqliteConnection, rpc: &'a BitcoinRpc) -> Self { + Self { conn, rpc } + } + + pub fn process_mempool(&mut self) { + let txids = self.rpc.get_raw_mempool().unwrap(); + + for txid in txids { + let tx = self.rpc.get_raw_tx(&txid).unwrap(); + + self.insert_mempool_tx(&txid.to_string()); + + for input in &tx.input { + let prevout = &input.previous_output; + diesel::insert_into(mempool_inputs::table) + .values(&MempoolInput { + txid: txid.to_string(), + input_txid: prevout.txid.to_string(), + input_vout: prevout.vout as i32, + }) + .execute(self.conn).unwrap(); + + self.mark_utxo_spent(&txid.to_string(), prevout.vout as i32, Some(&txid.to_string()), false); + + } + + for (vout, out) in tx.output.iter().enumerate() { + let utxo = Utxo { + txid: txid.to_string().clone(), + vout: vout as i32, + value: out.value.to_sat() as i32, + script_pubkey: out.script_pubkey.to_hex_string(), + confirmed: false, + spent: false, + spent_by_txid: None, + block_height: None, + }; + diesel::insert_or_ignore_into(utxos::table) + .values(&utxo) + .execute(self.conn).unwrap(); + } + } + } + + pub fn process_block(&mut self, height: u64) { + let block_hash = self.rpc.get_block_hash(height).unwrap(); + let block = self.rpc.get_block(block_hash).unwrap(); + + for tx in block.txdata.iter() { + for input in &tx.input { + let prevout = &input.previous_output; + self.mark_utxo_spent(&prevout.txid.to_string(), prevout.vout as i32, Some(&tx.compute_txid().to_string()), true); + } + + for (vout, out) in tx.output.iter().enumerate() { + let utxo = Utxo { + txid: tx.compute_txid().to_string(), + vout: vout as i32, + value: out.value.to_sat() as i32, + script_pubkey: out.script_pubkey.to_hex_string(), + confirmed: true, + spent: false, + spent_by_txid: None, + block_height: Some(height as i32), + }; + diesel::insert_or_ignore_into(utxos::table) + .values(&utxo) + .execute(self.conn).unwrap(); + } + } + } + + fn insert_mempool_tx(&mut self, txid: &str) { + diesel::insert_or_ignore_into(mempool_tx::table) + .values(&MempoolTx { + txid: txid.to_string(), + seen_at: NaiveDateTime::MIN, + }) + .execute(self.conn).unwrap(); + } + + fn mark_utxo_spent( + &mut self, + txid: &str, + vout: i32, + spent_by: Option<&str>, + is_confirmed_spend: bool, + ){ + use utxos::dsl; + + let confirmed_value = is_confirmed_spend; + + diesel::update(dsl::utxos.filter(dsl::txid.eq(txid).and(dsl::vout.eq(vout)))) + .set(( + dsl::spent.eq(true), + dsl::spent_by_txid.eq(spent_by.map(str::to_string)), + dsl::confirmed.eq(confirmed_value), + )) + .execute(self.conn).unwrap(); + + } +} From 7bb228d00d61f93baa55bb070ca708911d26d656 Mon Sep 17 00:00:00 2001 From: Wukong Date: Sat, 26 Jul 2025 10:01:35 +0530 Subject: [PATCH 6/8] add utxo watcher --- Cargo.lock | 109 ++++++++++++++++++ Cargo.toml | 1 + .../up.sql | 2 +- src/db/db_manager.rs | 19 ++- src/db/schema.rs | 2 +- src/indexer/mod.rs | 2 +- src/indexer/tracker_indexer.rs | 18 ++- src/indexer/utxo_indexer.rs | 57 ++++++--- src/main.rs | 28 ++++- src/server/tracker_server.rs | 24 ++++ src/types.rs | 11 +- tracker.db | Bin 40960 -> 0 bytes 12 files changed, 245 insertions(+), 28 deletions(-) delete mode 100644 tracker.db diff --git a/Cargo.lock b/Cargo.lock index 81d838d..9a83cb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,6 +388,17 @@ dependencies = [ "syn", ] +[[package]] +name = "diesel_migrations" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a73ce704bad4231f001bff3314d91dce4aba0770cee8b233991859abc15c1f6" +dependencies = [ + "diesel", + "migrations_internals", + "migrations_macros", +] + [[package]] name = "diesel_table_macro_syntax" version = "0.2.0" @@ -417,6 +428,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "fnv" version = "1.0.7" @@ -489,6 +506,12 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" + [[package]] name = "heck" version = "0.5.0" @@ -546,6 +569,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "indexmap" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -649,6 +682,27 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "migrations_internals" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bda1634d70d5bd53553cf15dca9842a396e8c799982a3ad22998dc44d961f24" +dependencies = [ + "serde", + "toml", +] + +[[package]] +name = "migrations_macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb161cc72176cb37aa47f1fc520d3ef02263d67d661f44f05d05a079e1237fd" +dependencies = [ + "migrations_internals", + "proc-macro2", + "quote", +] + [[package]] name = "miniz_oxide" version = "0.8.8" @@ -996,6 +1050,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40734c41988f7306bb04f0ecf60ec0f3f1caa34290e4e8ea471dcd3346483b83" +dependencies = [ + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1190,6 +1253,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed0aee96c12fa71097902e0bb061a5e1ebd766a6636bb605ba401c45c1650eac" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + +[[package]] +name = "toml_datetime" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bade1c3e902f58d73d3f294cd7f20391c1cb2fbcb643b73566bc773971df91e3" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_parser" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97200572db069e74c512a14117b296ba0a80a30123fbbb5aa1f4a348f639ca30" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" + [[package]] name = "tracing" version = "0.1.41" @@ -1259,6 +1361,7 @@ dependencies = [ "chrono", "clap", "diesel", + "diesel_migrations", "hex", "r2d2", "serde", @@ -1533,6 +1636,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" + [[package]] name = "zerocopy" version = "0.8.25" diff --git a/Cargo.toml b/Cargo.toml index 23e25c9..bebbbcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ bitcoincore-rpc = "0.19.0" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5.37", features = ["derive"] } diesel = { version = "2.2.12", features = ["chrono", "sqlite", "r2d2"] } +diesel_migrations = "2.2.0" hex = "0.4.3" r2d2 = "0.8.10" serde = { version = "1.0.219", features = ["derive"] } diff --git a/migrations/2025-07-12-121135_create_servers_table/up.sql b/migrations/2025-07-12-121135_create_servers_table/up.sql index 1c33a6d..cc063d7 100644 --- a/migrations/2025-07-12-121135_create_servers_table/up.sql +++ b/migrations/2025-07-12-121135_create_servers_table/up.sql @@ -19,7 +19,7 @@ CREATE TABLE utxos ( CREATE TABLE mempool_tx ( - txid TEXT PRIMARY KEY, + txid TEXT PRIMARY KEY NOT NULL, seen_at TIMESTAMP NOT NULL ); diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index 4743d53..f69c5ec 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -1,10 +1,13 @@ -use diesel::{SqliteConnection, r2d2::ConnectionManager}; +use crate::db::model::MempoolTx; +use diesel::RunQueryDsl; +use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SqliteConnection, r2d2::ConnectionManager}; use r2d2::Pool; use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc::Receiver; use tracing::info; use crate::{ + db::schema::{mempool_inputs, mempool_tx}, error::TrackerError, status::{self, Status}, types::{DbRequest, ServerInfo}, @@ -16,6 +19,7 @@ pub async fn run( status_tx: status::Sender, ) { let mut servers: HashMap = HashMap::new(); + let mut conn = pool.get().unwrap(); info!("DB manager started"); while let Some(request) = rx.recv().await { match request { @@ -47,6 +51,19 @@ pub async fn run( .collect(); let _ = resp_tx.send(response).await; } + DbRequest::WatchUtxo(outpoint, resp_tx) => { + info!("Watch utxo intercepted"); + + let mempool_tx = mempool_tx::table + .inner_join(mempool_inputs::table.on(mempool_tx::txid.eq(mempool_inputs::txid))) + .filter(mempool_inputs::input_txid.eq(outpoint.txid.to_string())) + .filter(mempool_inputs::input_vout.eq(outpoint.vout as i32)) + .select((mempool_tx::txid, mempool_tx::seen_at)) + .load::(&mut conn) + .unwrap(); + + let _ = resp_tx.send(mempool_tx).await; + } } } diff --git a/src/db/schema.rs b/src/db/schema.rs index 4135c0e..8aab1d9 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -11,7 +11,7 @@ diesel::table! { diesel::table! { mempool_tx (txid) { - txid -> Nullable, + txid -> Text, seen_at -> Timestamp, } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b6d48c2..940c4d0 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,4 +1,4 @@ mod tracker_indexer; pub use tracker_indexer::run; mod rpc; -mod utxo_indexer; \ No newline at end of file +mod utxo_indexer; diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index 58b1541..c1b5aff 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -1,5 +1,7 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; +use diesel::{SqliteConnection, r2d2::ConnectionManager}; +use r2d2::Pool; use tokio::{sync::mpsc::Sender, time::Instant}; use bitcoincore_rpc::bitcoin::absolute::{Height, LockTime}; @@ -7,18 +9,28 @@ use tracing::info; use super::rpc::BitcoinRpc; use crate::{ - handle_result, status, + handle_result, + indexer::utxo_indexer::Indexer, + status, types::{DbRequest, ServerInfo}, }; -pub async fn run(db_tx: Sender, status_tx: status::Sender, client: BitcoinRpc) { +pub async fn run( + pool: Arc>>, + db_tx: Sender, + status_tx: status::Sender, + client: BitcoinRpc, +) { info!("Indexer started"); + let mut utxo_indexer = Indexer::new(pool, &client); let mut last_tip = 0; loop { let blockchain_info = handle_result!(status_tx, client.get_blockchain_info()); let tip_height = blockchain_info.blocks + 1; + utxo_indexer.process_mempool(); for height in last_tip..tip_height { + utxo_indexer.process_block(height); let block_hash = handle_result!(status_tx, client.get_block_hash(height)); let block = handle_result!(status_tx, client.get_block(block_hash)); for tx in block.txdata { diff --git a/src/indexer/utxo_indexer.rs b/src/indexer/utxo_indexer.rs index 6dd4f88..c28beb5 100644 --- a/src/indexer/utxo_indexer.rs +++ b/src/indexer/utxo_indexer.rs @@ -1,22 +1,27 @@ -use chrono::NaiveDateTime; -use diesel::prelude::*; -use diesel::SqliteConnection; -use crate::db::schema::{mempool_inputs, mempool_tx, utxos}; +use std::sync::Arc; + use crate::db::model::{MempoolInput, MempoolTx, Utxo}; +use crate::db::schema::{mempool_inputs, mempool_tx, utxos}; use crate::indexer::rpc::BitcoinRpc; +use chrono::NaiveDateTime; +use diesel::SqliteConnection; +use diesel::prelude::*; +use diesel::r2d2::ConnectionManager; +use r2d2::Pool; pub struct Indexer<'a> { - conn: &'a mut SqliteConnection, + conn: Arc>>, rpc: &'a BitcoinRpc, } impl<'a> Indexer<'a> { - pub fn new(conn: &'a mut SqliteConnection, rpc: &'a BitcoinRpc) -> Self { + pub fn new(conn: Arc>>, rpc: &'a BitcoinRpc) -> Self { Self { conn, rpc } } pub fn process_mempool(&mut self) { let txids = self.rpc.get_raw_mempool().unwrap(); + let mut conn = self.conn.get().expect("Failed to get DB connection"); for txid in txids { let tx = self.rpc.get_raw_tx(&txid).unwrap(); @@ -31,10 +36,15 @@ impl<'a> Indexer<'a> { input_txid: prevout.txid.to_string(), input_vout: prevout.vout as i32, }) - .execute(self.conn).unwrap(); - - self.mark_utxo_spent(&txid.to_string(), prevout.vout as i32, Some(&txid.to_string()), false); - + .execute(&mut conn) + .unwrap(); + + self.mark_utxo_spent( + &txid.to_string(), + prevout.vout as i32, + Some(&txid.to_string()), + false, + ); } for (vout, out) in tx.output.iter().enumerate() { @@ -50,19 +60,26 @@ impl<'a> Indexer<'a> { }; diesel::insert_or_ignore_into(utxos::table) .values(&utxo) - .execute(self.conn).unwrap(); + .execute(&mut conn) + .unwrap(); } } } pub fn process_block(&mut self, height: u64) { let block_hash = self.rpc.get_block_hash(height).unwrap(); - let block = self.rpc.get_block(block_hash).unwrap(); + let block = self.rpc.get_block(block_hash).unwrap(); + let mut conn = self.conn.get().expect("Failed to get DB connection"); for tx in block.txdata.iter() { for input in &tx.input { let prevout = &input.previous_output; - self.mark_utxo_spent(&prevout.txid.to_string(), prevout.vout as i32, Some(&tx.compute_txid().to_string()), true); + self.mark_utxo_spent( + &prevout.txid.to_string(), + prevout.vout as i32, + Some(&tx.compute_txid().to_string()), + true, + ); } for (vout, out) in tx.output.iter().enumerate() { @@ -78,18 +95,21 @@ impl<'a> Indexer<'a> { }; diesel::insert_or_ignore_into(utxos::table) .values(&utxo) - .execute(self.conn).unwrap(); + .execute(&mut conn) + .unwrap(); } } } fn insert_mempool_tx(&mut self, txid: &str) { + let mut conn = self.conn.get().expect("Failed to get DB connection"); diesel::insert_or_ignore_into(mempool_tx::table) .values(&MempoolTx { txid: txid.to_string(), seen_at: NaiveDateTime::MIN, }) - .execute(self.conn).unwrap(); + .execute(&mut conn) + .unwrap(); } fn mark_utxo_spent( @@ -98,8 +118,9 @@ impl<'a> Indexer<'a> { vout: i32, spent_by: Option<&str>, is_confirmed_spend: bool, - ){ + ) { use utxos::dsl; + let mut conn = self.conn.get().expect("Failed to get DB connection"); let confirmed_value = is_confirmed_spend; @@ -109,7 +130,7 @@ impl<'a> Indexer<'a> { dsl::spent_by_txid.eq(spent_by.map(str::to_string)), dsl::confirmed.eq(confirmed_value), )) - .execute(self.conn).unwrap(); - + .execute(&mut conn) + .unwrap(); } } diff --git a/src/main.rs b/src/main.rs index c9d461f..46353e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -107,6 +107,18 @@ impl From for Client { } } +use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; + +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + +fn run_migrations(pool: Arc>>) { + let mut conn = pool + .get() + .expect("Failed to get DB connection for migrations"); + conn.run_pending_migrations(MIGRATIONS) + .expect("Migration failed"); +} + #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); @@ -117,12 +129,16 @@ async fn main() { info!("Connecting to indexer db"); let database_url = format!("{}/tracker.db", args.datadir); + if let Some(parent) = Path::new(&database_url).parent() { + std::fs::create_dir_all(parent).expect("Failed to create database directory"); + } let manager = ConnectionManager::::new(database_url); let pool = Arc::new( Pool::builder() .build(manager) .expect("Failed to create DB pool"), ); + run_migrations(pool.clone()); info!("Connected to indexer db"); check_tor_status(args.control_port, &args.tor_auth_password) @@ -155,7 +171,13 @@ async fn main() { let server_address = args.address.clone(); spawn_db_manager(pool.clone(), db_rx, status_tx.clone()).await; - spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), rpc_config.clone().into()).await; + spawn_mempool_indexer( + pool.clone(), + db_tx.clone(), + status_tx.clone(), + rpc_config.clone().into(), + ) + .await; spawn_server( db_tx.clone(), status_tx.clone(), @@ -183,7 +205,7 @@ async fn main() { State::MempoolShutdown(err) => { warn!("Mempool Indexer crashed. Restarting... Error: {:?}", err); let client: Client = rpc_config.clone().into(); - spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), client).await; + spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), client).await; } State::ServerShutdown(err) => { warn!("Server crashed. Restarting... Error: {:?}", err); @@ -209,12 +231,14 @@ async fn spawn_db_manager( } async fn spawn_mempool_indexer( + pool: Arc>>, db_tx: Sender, status_tx: Sender, client: Client, ) { info!("Spawning indexer"); tokio::spawn(indexer::run( + pool, db_tx, status::Sender::Mempool(status_tx), client.into(), diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 367310c..aac3cb5 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -1,3 +1,4 @@ +use crate::db::model::MempoolTx; use crate::server::tracker_monitor::monitor_systems; use crate::status; use crate::types::DbRequest; @@ -94,6 +95,29 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { TrackerRequest::Pong { address: _ } => { todo!() } + TrackerRequest::Watch { outpoint } => { + info!("Received a watch request from client: {outpoint:?}"); + + let (resp_tx, mut resp_rx) = mpsc::channel::>(1); + + let db_request = DbRequest::WatchUtxo(outpoint, resp_tx); + + if let Err(e) = db_tx.send(db_request).await { + error!("Failed to send DB request: {e}"); + break; + } + + let response = resp_rx.recv().await; + info!("Response: {:?}", response); + + if let Some(mempool_tx) = response { + let message = TrackerResponse::WatchResponse { mempool_tx }; + if let Err(e) = send_message(&mut writer, &message).await { + error!("Failed to send response to client: {e}"); + break; + } + } + } } } diff --git a/src/types.rs b/src/types.rs index 9ce9060..2017ccf 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,6 +5,8 @@ use bitcoincore_rpc::bitcoin::{ use serde::{Deserialize, Serialize}; use tokio::{sync::mpsc::Sender, time::Instant}; +use crate::db::model::MempoolTx; + #[derive(Debug, Clone)] pub struct ServerInfo { pub onion_address: String, @@ -18,6 +20,7 @@ pub enum DbRequest { Update(String, ServerInfo), QueryAll(Sender>), QueryActive(Sender>), + WatchUtxo(OutPoint, Sender>), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Hash)] @@ -63,11 +66,17 @@ pub enum TrackerRequest { /// A request sent by the taker to fetch all valid maker addresses from the DNS server. Get, /// To gauge server activity - Pong { address: String }, + Pong { + address: String, + }, + Watch { + outpoint: OutPoint, + }, } #[derive(Serialize, Deserialize, Debug)] pub enum TrackerResponse { Address { addresses: Vec }, Ping, + WatchResponse { mempool_tx: Vec }, } diff --git a/tracker.db b/tracker.db deleted file mode 100644 index e8e1f9c397659433f3e99a8a8f695a80fe17c71b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 40960 zcmeI(zf$8y9Kdl2gAL9BmnJG`mYEc21~S+j!{joViOvy(#F&dPxj)U0u@}&sZ8?%m zd?n=it5fG??gi4M%mbuOnYC<;b)2q97teRG&`P_Ke*4+AX2hbCAFF<>#CZ^Q8nM_h z9vY@;d?SQm7;}0l>g6^q=*;}>hF+Uf`K;`m@x`<6mskHWmWuC;m3OPZuNW)8J$SqP z$0vV1c&T@h5I_I{1Q0*~0R#~EPXagAQo*uq^H)D^oV8U)b$UV2mVUR_kE8TfvEsQ- z-4%7`N!1nUGOiIOCF<_;x~LuN<+NJeO!g1`NU5&W&kgm%Blo2496g;hxM=Sz z=B&Fb)=*K~_E$^-_OLzIDo617LvMlqBN}^4gll;o3VoY}33oqSQ zAL)myM!T;v`Y39KelM22{@E+_`a@%&8FbJ6u%lYy$?YX{9&@>2O1my=;NDw*mHk?wLkpSQlYvgi{$SxQ!g z>(6opYi-TE$tB>C3a?ZcjaTz2^zqIyd(ia*-S@3lsG?}(G!@V&!lwQTwt`=}GE!Qt z73tybR3}_xp}5gj_k@b})_mS_*0S0#jzyNnnT6};X2EhC^X99h9a6S@6{)t2nwP56 zkRAUbY{WX^D7$1X)!OWOV*q`d7*gqTik}>>@}1*t>~7m)3gNw$>I1@lSL*urzb=!Z zQ#tkY{h%%T;95HvLv$Ol%?#loW98}4sW!}dNEi3H1vmr00IagfB*srAb|EkA@Tc=IY;u{*Vwr z009ILKmY**5I_I{1Q0-ACImjs*-Pm^2@L1|^*{ZS5I_I{1Q0*~0R#|0009ILKwyRi zk{A4W{-5C{Y&c*KmY**5I_I{1Q0*~0R#{j3h@3v;{X8!5I_I{1Q0*~0R#|00D;*TVE%vh v` Date: Sat, 26 Jul 2025 10:13:40 +0530 Subject: [PATCH 7/8] correct the server message naming --- src/server/tracker_monitor.rs | 8 ++++---- src/server/tracker_server.rs | 18 +++++++++--------- src/types.rs | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs index cbbf3a1..d345b48 100644 --- a/src/server/tracker_monitor.rs +++ b/src/server/tracker_monitor.rs @@ -13,7 +13,7 @@ use crate::{ handle_result, server::send_message_with_prefix, status, - types::{DbRequest, ServerInfo, TrackerRequest, TrackerResponse}, + types::{DbRequest, ServerInfo, TrackerClientToServer, TrackerServerToClient}, utils::read_message, }; @@ -59,11 +59,11 @@ pub async fn monitor_systems( let mut writer = BufWriter::new(write_half); - let message = TrackerResponse::Ping; + let message = TrackerServerToClient::Ping; _ = send_message_with_prefix(&mut writer, &message).await; let buffer = handle_result!(status_tx, read_message(&mut reader).await); - let response: TrackerRequest = + let response: TrackerClientToServer = match serde_cbor::de::from_reader(&buffer[..]) { Ok(resp) => resp, Err(e) => { @@ -73,7 +73,7 @@ pub async fn monitor_systems( } }; - if let TrackerRequest::Pong { address } = response { + if let TrackerClientToServer::Pong { address } = response { let updated_info = ServerInfo { onion_address: address.clone(), cooldown: Instant::now(), diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index aac3cb5..816bc72 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -2,8 +2,8 @@ use crate::db::model::MempoolTx; use crate::server::tracker_monitor::monitor_systems; use crate::status; use crate::types::DbRequest; -use crate::types::TrackerRequest; -use crate::types::TrackerResponse; +use crate::types::TrackerClientToServer; +use crate::types::TrackerServerToClient; use crate::utils::read_message; use crate::utils::send_message; use tokio::io::BufReader; @@ -57,7 +57,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { } }; - let request: TrackerRequest = match serde_cbor::de::from_reader(&buffer[..]) { + let request: TrackerClientToServer = match serde_cbor::de::from_reader(&buffer[..]) { Ok(r) => r, Err(e) => { error!("Failed to deserialize client request: {e}"); @@ -66,7 +66,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { }; match request { - TrackerRequest::Get => { + TrackerClientToServer::Get => { info!("Received Get request taker"); let (resp_tx, mut resp_rx) = mpsc::channel(1); let db_request = DbRequest::QueryActive(resp_tx); @@ -80,7 +80,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { info!("Response: {:?}", response); if let Some(addresses) = response { - let message = TrackerResponse::Address { addresses }; + let message = TrackerServerToClient::Address { addresses }; if let Err(e) = send_message(&mut writer, &message).await { error!("Failed to send response to client: {e}"); break; @@ -88,14 +88,14 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { } } - TrackerRequest::Post { metadata: _ } => { + TrackerClientToServer::Post { metadata: _ } => { todo!() } - TrackerRequest::Pong { address: _ } => { + TrackerClientToServer::Pong { address: _ } => { todo!() } - TrackerRequest::Watch { outpoint } => { + TrackerClientToServer::Watch { outpoint } => { info!("Received a watch request from client: {outpoint:?}"); let (resp_tx, mut resp_rx) = mpsc::channel::>(1); @@ -111,7 +111,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { info!("Response: {:?}", response); if let Some(mempool_tx) = response { - let message = TrackerResponse::WatchResponse { mempool_tx }; + let message = TrackerServerToClient::WatchResponse { mempool_tx }; if let Err(e) = send_message(&mut writer, &message).await { error!("Failed to send response to client: {e}"); break; diff --git a/src/types.rs b/src/types.rs index 2017ccf..3fcbe01 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,7 +57,7 @@ pub struct DnsMetadata { #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] -pub enum TrackerRequest { +pub enum TrackerClientToServer { /// A request sent by the maker to register itself with the DNS server and authenticate. Post { /// Metadata containing the maker's URL and fidelity proof. @@ -75,7 +75,7 @@ pub enum TrackerRequest { } #[derive(Serialize, Deserialize, Debug)] -pub enum TrackerResponse { +pub enum TrackerServerToClient { Address { addresses: Vec }, Ping, WatchResponse { mempool_tx: Vec }, From 155f0ef51d6e072399f879588f87aa3cdb46769d Mon Sep 17 00:00:00 2001 From: Wukong Date: Mon, 28 Jul 2025 11:11:08 +0530 Subject: [PATCH 8/8] improve messages --- src/main.rs | 4 ++++ src/server/tracker_monitor.rs | 7 ++++++- src/server/tracker_server.rs | 7 +++++++ src/types.rs | 2 +- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 46353e9..c0c6ebc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -183,6 +183,7 @@ async fn main() { status_tx.clone(), server_address.clone(), args.socks_port, + hostname.clone(), ) .await; @@ -214,6 +215,7 @@ async fn main() { status_tx.clone(), server_address.clone(), args.socks_port, + hostname.clone(), ) .await; } @@ -250,6 +252,7 @@ async fn spawn_server( status_tx: Sender, address: String, socks_port: u16, + hostname: String, ) { info!("Spawning server instance"); tokio::spawn(server::run( @@ -257,5 +260,6 @@ async fn spawn_server( status::Sender::Server(status_tx), address, socks_port, + hostname, )); } diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs index d345b48..9719045 100644 --- a/src/server/tracker_monitor.rs +++ b/src/server/tracker_monitor.rs @@ -24,6 +24,8 @@ pub async fn monitor_systems( db_tx: Sender, status_tx: status::Sender, socks_port: u16, + onion_address: String, + port: u16, ) -> Result<(), TrackerError> { info!("Starting to monitor other maker services"); @@ -59,7 +61,10 @@ pub async fn monitor_systems( let mut writer = BufWriter::new(write_half); - let message = TrackerServerToClient::Ping; + let message = TrackerServerToClient::Ping { + address: onion_address.clone(), + port, + }; _ = send_message_with_prefix(&mut writer, &message).await; let buffer = handle_result!(status_tx, read_message(&mut reader).await); diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 816bc72..37c8a8d 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -20,13 +20,20 @@ pub async fn run( status_tx: status::Sender, address: String, socks_port: u16, + onion_address: String, ) -> Result<(), Box> { + let port = address + .rsplit_once(':') + .and_then(|(_, port)| port.parse::().ok()) + .unwrap_or(8080); let server = TcpListener::bind(&address).await?; tokio::spawn(monitor_systems( db_tx.clone(), status_tx.clone(), socks_port, + onion_address, + port, )); info!("Tracker server listening on {}", address); diff --git a/src/types.rs b/src/types.rs index 3fcbe01..13fbd26 100644 --- a/src/types.rs +++ b/src/types.rs @@ -77,6 +77,6 @@ pub enum TrackerClientToServer { #[derive(Serialize, Deserialize, Debug)] pub enum TrackerServerToClient { Address { addresses: Vec }, - Ping, + Ping { address: String, port: u16 }, WatchResponse { mempool_tx: Vec }, }