Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/be/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Transaction {
}
}

impl<'a> TransactionRef<'a> {
impl TransactionRef<'_> {
pub fn txid(&self) -> crate::be::Txid {
match self {
TransactionRef::Bitcoin(tx) => tx.compute_txid().into(),
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<'a> Iterator for InputIterator<'a> {
}
}

impl<'a> OutputRef<'a> {
impl OutputRef<'_> {
pub(crate) fn skip_utxo(&self) -> bool {
match self {
OutputRef::Bitcoin(output) => output.script_pubkey.is_op_return(),
Expand Down Expand Up @@ -201,7 +201,7 @@ impl<'a> OutputRef<'a> {
}
}

impl<'a> InputRef<'a> {
impl InputRef<'_> {
pub(crate) fn skip_indexing(&self) -> bool {
match self {
InputRef::Bitcoin(_) => false,
Expand Down
2 changes: 1 addition & 1 deletion src/be/txid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<'de> Deserialize<'de> for Txid {
if deserializer.is_human_readable() {
struct TxidVisitor;

impl<'de> Visitor<'de> for TxidVisitor {
impl Visitor<'_> for TxidVisitor {
type Value = Txid;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
Expand Down
24 changes: 11 additions & 13 deletions src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Client {
.with_context(|| format!("failing converting {text} to ChainInfo"))?;
Ok(Some(chain_info))
} else {
return Err(Error::UnexpectedStatus(url, status).into());
Err(Error::UnexpectedStatus(url, status).into())
}
}

Expand Down Expand Up @@ -374,18 +374,16 @@ impl Client {
let content: HashSet<crate::be::Txid> = serde_json::from_slice(&body_bytes)
.with_context(|| format!("failure converting {url} body in HashSet<Txid>"))?;
content
} else if support_verbose {
serde_json::from_slice(&body_bytes)
.with_context(|| format!("failure converting {url} body in HashSet<Txid> "))?
} else {
if support_verbose {
serde_json::from_slice(&body_bytes)
.with_context(|| format!("failure converting {url} body in HashSet<Txid> "))?
} else {
let content: HashMap<crate::be::Txid, Empty> = serde_json::from_slice(&body_bytes)
.with_context(|| {
format!("failure converting {url} body in HashMap<Txid, Empty> ")
})?;
let content: HashMap<crate::be::Txid, Empty> = serde_json::from_slice(&body_bytes)
.with_context(|| {
format!("failure converting {url} body in HashMap<Txid, Empty> ")
})?;

content.into_keys().collect()
}
content.into_keys().collect()
})
}

Expand Down Expand Up @@ -477,11 +475,11 @@ impl Client {
if let Some(header) = self.block_header_json(last.hash, family).await? {
if let Some(next) = header.nextblockhash {
let header = self.block_header(next, family).await?;
return Ok(ChainStatus::NewBlock(BlockMeta::new(
Ok(ChainStatus::NewBlock(BlockMeta::new(
last.height + 1,
next,
header.time(),
)));
)))
} else {
Ok(ChainStatus::Tip)
}
Expand Down
12 changes: 10 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ impl std::fmt::Debug for Arguments {
.field("enable_db_statistics", &self.enable_db_statistics)
.field("cache_control_seconds", &self.cache_control_seconds)
.field("request_timeout_seconds", &self.request_timeout_seconds)
.field("header_read_timeout_seconds", &self.header_read_timeout_seconds);
.field(
"header_read_timeout_seconds",
&self.header_read_timeout_seconds,
);

#[cfg(feature = "db")]
{
Expand Down Expand Up @@ -452,7 +455,12 @@ pub async fn inner_main(
.await;

if let Err(err) = result {
log::error!("Error serving connection: {:?}", err);
let msg = format!("{err:?}");
if msg.contains("HeaderTimeout") {
log::warn!("Header read timeout (possible scanner/slowloris): {msg}");
} else {
log::error!("Error serving connection: {msg}");
}
}
});
},
Expand Down
2 changes: 1 addition & 1 deletion src/server/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ async fn handle_waterfalls_req(
txs_seen: map,
page,
tip: tip_hash,
tip_meta: tip_meta,
tip_meta,
};
let content = if cbor {
"application/cbor"
Expand Down
2 changes: 1 addition & 1 deletion src/server/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl State {
}
pub async fn set_hash_ts(&self, meta: &BlockMeta) {
let mut blocks_hash_ts = self.blocks_hash_ts.lock().await;
update_hash_ts(&mut *blocks_hash_ts, meta);
update_hash_ts(&mut blocks_hash_ts, meta);
}
pub fn address(&self) -> bitcoin::Address {
p2pkh(&self.secp, &self.wif_key)
Expand Down
90 changes: 55 additions & 35 deletions src/store/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use crate::{
error_panic,
store::{BlockMeta, Store, TxSeen},
Height, ScriptHash,
};
Expand Down Expand Up @@ -267,6 +268,7 @@ impl DBStore {

/// Remove UTXOs from the database and return their script hashes.
/// This writes immediately to the database (non-atomic with other operations).
#[cfg(test)]
fn remove_utxos(&self, outpoints: &[OutPoint]) -> Result<Vec<(OutPoint, ScriptHash)>> {
let result = self.get_utxos_for_spending(outpoints)?;

Expand Down Expand Up @@ -301,7 +303,11 @@ impl DBStore {
Ok(())
}

fn remove_history_entries(&self, to_remove: &BTreeMap<ScriptHash, Vec<TxSeen>>) -> Result<()> {
fn remove_history_entries_batch(
&self,
batch: &mut rocksdb::WriteBatch,
to_remove: &BTreeMap<ScriptHash, Vec<TxSeen>>,
) -> Result<()> {
if to_remove.is_empty() {
return Ok(());
}
Expand All @@ -314,8 +320,6 @@ impl DBStore {
// Read current history for these script hashes
let current_history = self.get_history(&script_hashes)?;

let estimate_size = estimate_history_size(to_remove);
let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimate_size);
let cf = self.history_cf();

for (i, script_hash) in script_hashes.iter().enumerate() {
Expand Down Expand Up @@ -344,7 +348,6 @@ impl DBStore {
);
}
}
self.write(batch)?;
Ok(())
}

Expand Down Expand Up @@ -454,11 +457,46 @@ impl DBStore {
}

fn write(&self, batch: rocksdb::WriteBatch) -> Result<()> {
Ok(if self.ibd.load(Ordering::Relaxed) {
if self.ibd.load(Ordering::Relaxed) {
self.db.write_without_wal(batch)?
} else {
self.db.write(batch)?
})
};
Ok(())
}

fn _reorg(&self) -> Result<()> {
let reorg_data = self
.reorg_data
.lock()
.map_err(|e| anyhow::anyhow!("reorg_data lock poisoned: {e}"))?;

let mut batch = rocksdb::WriteBatch::default();

// Restore UTXOs that were spent in the reorged block
self.insert_utxos(
&mut batch,
reorg_data
.spent
.iter()
.map(|(outpoint, script_hash)| (outpoint, script_hash)),
)?;

// Remove UTXOs that were created in the reorged block
if !reorg_data.utxos_created.is_empty() {
let outpoints_to_remove: Vec<OutPoint> =
reorg_data.utxos_created.keys().cloned().collect();
self.delete_utxos_batch(&mut batch, &outpoints_to_remove)?;
}

// Remove history entries that were added in the reorged block
if !reorg_data.history.is_empty() {
self.remove_history_entries_batch(&mut batch, &reorg_data.history)?;
}

self.write(batch)?;

Ok(())
}
}

Expand Down Expand Up @@ -598,39 +636,21 @@ impl Store for DBStore {
}

fn reorg(&self) {
let reorg_data = self.reorg_data.lock().unwrap();

// Estimate batch size for UTXO restoration
let utxo_restore_size = reorg_data.spent.len() * 44; // 44 bytes per UTXO entry
let mut batch = rocksdb::WriteBatch::with_capacity_bytes(utxo_restore_size);

// Restore UTXOs that were spent in the reorged block
self.insert_utxos(
&mut batch,
reorg_data
.spent
.iter()
.map(|(outpoint, script_hash)| (outpoint, script_hash)),
)
.unwrap(); // TODO handle unwrap;

self.write(batch).unwrap(); // TODO handle unwrap;

// Remove UTXOs that were created in the reorged block
if !reorg_data.utxos_created.is_empty() {
let outpoints_to_remove: Vec<OutPoint> =
reorg_data.utxos_created.keys().cloned().collect();
self.remove_utxos(&outpoints_to_remove).unwrap(); // TODO handle unwrap;
}

// Remove history entries that were added in the reorged block
if !reorg_data.history.is_empty() {
self.remove_history_entries(&reorg_data.history).unwrap(); // TODO handle unwrap;
if let Err(e) = self._reorg() {
error_panic!("reorg failed: {e}");
}
}

fn ibd_finished(&self) {
log::info!("Initial block download finished, setting ibd to false in the store");
log::info!("Initial block download finished, flushing memtables before enabling WAL...");
for cf_name in COLUMN_FAMILIES {
if let Some(cf) = self.db.cf_handle(cf_name) {
self.db
.flush_cf(&cf)
.unwrap_or_else(|e| log::error!("failed to flush CF {cf_name}: {e}"));
}
}
log::info!("Memtables flushed, setting ibd to false");
self.ibd.store(false, Ordering::Relaxed);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/threads/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn get_next_block_to_index(
) -> Option<BlockMeta> {
match last_indexed.as_ref() {
Some(last) => {
match client.get_next(&last, family).await {
match client.get_next(last, family).await {
Ok(ChainStatus::NewBlock(next)) => Some(next),
Ok(ChainStatus::Reorg) => {
log::warn!("reorg happened! {last:?} removed from the chain");
Expand Down
2 changes: 1 addition & 1 deletion src/threads/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn sync_mempool_once(

let db = &state.store;
let tip = state.tip_height().await;
let new: Vec<_> = current.difference(&mempool_txids).collect();
let new: Vec<_> = current.difference(mempool_txids).collect();
let removed: Vec<_> = mempool_txids.difference(&current).cloned().collect();
if !new.is_empty() {
log::debug!("new txs in mempool {:?}, tip: {tip:?}", new);
Expand Down