Skip to content

Commit 3b6403e

Browse files
authored
Merge branch 'main' into 2025-12-15-fix-ci
2 parents 36ce922 + 45d9b49 commit 3b6403e

File tree

13 files changed

+346
-137
lines changed

13 files changed

+346
-137
lines changed

.github/workflows/build-cli-binary.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ jobs:
2424
submodules: recursive
2525
ssh-key: ${{ secrets.PUBLISH_PRIVATE_KEY }}
2626

27+
- name: Free disk space
28+
uses: jlumbroso/free-disk-space@v1.3.1
29+
2730
- uses: nixbuild/nix-quick-install-action@v30
2831
with:
2932
nix_conf: |

crates/cli/src/commands/local_db/executor.rs

Lines changed: 89 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use serde_json::{json, Map, Value};
88
use std::convert::TryFrom;
99
use std::path::{Path, PathBuf};
1010
use std::time::Duration;
11+
use tokio::task::spawn_blocking;
1112

1213
pub struct RusqliteExecutor {
1314
db_path: PathBuf,
@@ -32,17 +33,6 @@ impl RusqliteExecutor {
3233
}
3334
}
3435

35-
fn open_connection(&self) -> Result<Connection, LocalDbQueryError> {
36-
let conn = Connection::open(&self.db_path)
37-
.map_err(|e| LocalDbQueryError::database(format!("Failed to open database: {e}")))?;
38-
conn.busy_timeout(Duration::from_millis(500))
39-
.map_err(|e| LocalDbQueryError::database(format!("Failed to set busy_timeout: {e}")))?;
40-
functions::register_all(&conn).map_err(|e| {
41-
LocalDbQueryError::database(format!("Failed to register sqlite functions: {e}"))
42-
})?;
43-
Ok(conn)
44-
}
45-
4636
fn invoke_statement(conn: &Connection, stmt: &SqlStatement) -> Result<(), LocalDbQueryError> {
4737
if stmt.params().is_empty() {
4838
conn.execute_batch(stmt.sql())
@@ -61,6 +51,21 @@ impl RusqliteExecutor {
6151
}
6252
}
6353

54+
fn open_connection(db_path: &Path) -> Result<Connection, LocalDbQueryError> {
55+
let conn = Connection::open(db_path)
56+
.map_err(|e| LocalDbQueryError::database(format!("Failed to open database: {e}")))?;
57+
conn.busy_timeout(Duration::from_millis(500))
58+
.map_err(|e| LocalDbQueryError::database(format!("Failed to set busy_timeout: {e}")))?;
59+
functions::register_all(&conn).map_err(|e| {
60+
LocalDbQueryError::database(format!("Failed to register sqlite functions: {e}"))
61+
})?;
62+
Ok(conn)
63+
}
64+
65+
fn join_err(err: tokio::task::JoinError) -> LocalDbQueryError {
66+
LocalDbQueryError::database(format!("Blocking task failed: {err}"))
67+
}
68+
6469
#[async_trait(?Send)]
6570
impl LocalDbQueryExecutor for RusqliteExecutor {
6671
async fn execute_batch(&self, batch: &SqlStatementBatch) -> Result<(), LocalDbQueryError> {
@@ -70,74 +75,92 @@ impl LocalDbQueryExecutor for RusqliteExecutor {
7075
));
7176
}
7277

73-
let conn = self.open_connection()?;
74-
75-
for stmt in batch {
76-
if let Err(err) = Self::invoke_statement(&conn, stmt) {
77-
let _ = conn.execute_batch("ROLLBACK");
78-
return Err(err);
78+
let db_path = self.db_path.clone();
79+
let batch = batch.clone();
80+
spawn_blocking(move || {
81+
let conn = open_connection(&db_path)?;
82+
for stmt in &batch {
83+
if let Err(err) = RusqliteExecutor::invoke_statement(&conn, stmt) {
84+
let _ = conn.execute_batch("ROLLBACK");
85+
return Err(err);
86+
}
7987
}
80-
}
81-
82-
Ok(())
88+
Ok(())
89+
})
90+
.await
91+
.map_err(join_err)?
8392
}
8493

8594
async fn query_text(&self, stmt: &SqlStatement) -> Result<String, LocalDbQueryError> {
86-
let conn = self.open_connection()?;
87-
Self::invoke_statement(&conn, stmt)?;
88-
Ok(String::new())
95+
let db_path = self.db_path.clone();
96+
let stmt = stmt.clone();
97+
spawn_blocking(move || {
98+
let conn = open_connection(&db_path)?;
99+
RusqliteExecutor::invoke_statement(&conn, &stmt)?;
100+
Ok(String::new())
101+
})
102+
.await
103+
.map_err(join_err)?
89104
}
90105

91106
async fn query_json<T>(&self, stmt: &SqlStatement) -> Result<T, LocalDbQueryError>
92107
where
93108
T: FromDbJson,
94109
{
95-
let conn = self.open_connection()?;
96-
let mut s = conn
97-
.prepare(stmt.sql())
98-
.map_err(|e| LocalDbQueryError::database(format!("Failed to prepare query: {e}")))?;
99-
let column_names: Vec<String> = (0..s.column_count())
100-
.map(|i| {
101-
let raw = s.column_name(i).unwrap_or("");
102-
let trimmed = raw.trim();
103-
if trimmed.is_empty() {
104-
format!("column_{}", i)
105-
} else {
106-
trimmed.to_string()
107-
}
108-
})
109-
.collect();
110+
let db_path = self.db_path.clone();
111+
let stmt = stmt.clone();
110112

111-
let bound = stmt.params().iter().cloned().map(sqlvalue_to_rusqlite);
112-
let params = rusqlite::params_from_iter(bound);
113-
114-
let rows_iter = s
115-
.query_map(params, |row| {
116-
let mut obj = Map::with_capacity(column_names.len());
117-
for (i, name) in column_names.iter().enumerate() {
118-
let v = match row.get_ref(i)? {
119-
ValueRef::Null => Value::Null,
120-
ValueRef::Integer(n) => json!(n),
121-
ValueRef::Real(f) => json!(f),
122-
ValueRef::Text(bytes) => match std::str::from_utf8(bytes) {
123-
Ok(s) => json!(s),
124-
Err(_) => json!(alloy::hex::encode_prefixed(bytes)),
125-
},
126-
ValueRef::Blob(bytes) => json!(alloy::hex::encode_prefixed(bytes)),
127-
};
128-
obj.insert(name.clone(), v);
129-
}
130-
Ok(Value::Object(obj))
131-
})
132-
.map_err(|e| LocalDbQueryError::database(format!("Query failed: {e}")))?;
113+
let json_value = spawn_blocking(move || {
114+
let conn = open_connection(&db_path)?;
115+
let mut s = conn.prepare(stmt.sql()).map_err(|e| {
116+
LocalDbQueryError::database(format!("Failed to prepare query: {e}"))
117+
})?;
118+
let column_names: Vec<String> = (0..s.column_count())
119+
.map(|i| {
120+
let raw = s.column_name(i).unwrap_or("");
121+
let trimmed = raw.trim();
122+
if trimmed.is_empty() {
123+
format!("column_{}", i)
124+
} else {
125+
trimmed.to_string()
126+
}
127+
})
128+
.collect();
133129

134-
let mut out: Vec<Value> = Vec::new();
135-
for r in rows_iter {
136-
let v = r.map_err(|e| LocalDbQueryError::database(format!("Row error: {e}")))?;
137-
out.push(v);
138-
}
130+
let bound = stmt.params().iter().cloned().map(sqlvalue_to_rusqlite);
131+
let params = rusqlite::params_from_iter(bound);
132+
133+
let rows_iter = s
134+
.query_map(params, |row| {
135+
let mut obj = Map::with_capacity(column_names.len());
136+
for (i, name) in column_names.iter().enumerate() {
137+
let v = match row.get_ref(i)? {
138+
ValueRef::Null => Value::Null,
139+
ValueRef::Integer(n) => json!(n),
140+
ValueRef::Real(f) => json!(f),
141+
ValueRef::Text(bytes) => match std::str::from_utf8(bytes) {
142+
Ok(s) => json!(s),
143+
Err(_) => json!(alloy::hex::encode_prefixed(bytes)),
144+
},
145+
ValueRef::Blob(bytes) => json!(alloy::hex::encode_prefixed(bytes)),
146+
};
147+
obj.insert(name.clone(), v);
148+
}
149+
Ok(Value::Object(obj))
150+
})
151+
.map_err(|e| LocalDbQueryError::database(format!("Query failed: {e}")))?;
152+
153+
let mut out: Vec<Value> = Vec::new();
154+
for r in rows_iter {
155+
let v = r.map_err(|e| LocalDbQueryError::database(format!("Row error: {e}")))?;
156+
out.push(v);
157+
}
158+
159+
Ok::<_, LocalDbQueryError>(Value::Array(out))
160+
})
161+
.await
162+
.map_err(join_err)??;
139163

140-
let json_value = Value::Array(out);
141164
serde_json::from_value::<T>(json_value)
142165
.map_err(|e| LocalDbQueryError::deserialization(e.to_string()))
143166
}

crates/cli/src/commands/local_db/pipeline/bootstrap.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ impl BootstrapPipeline for ProducerBootstrapAdapter {
2222
self.reset_db(db, None).await?;
2323

2424
if let Some(dump_stmt) = &config.dump_stmt {
25-
db.query_text(dump_stmt).await?;
25+
db.execute_batch(dump_stmt).await?;
2626
}
2727

2828
Ok(())
@@ -190,7 +190,7 @@ mod tests {
190190

191191
let cfg = BootstrapConfig {
192192
ob_id: sample_ob_id(),
193-
dump_stmt: Some(dump_stmt.clone()),
193+
dump_stmt: Some(SqlStatementBatch::from(vec![dump_stmt.clone()])),
194194
latest_block: 0,
195195
block_number_threshold: TEST_BLOCK_NUMBER_THRESHOLD,
196196
deployment_block: 1,
@@ -229,7 +229,7 @@ mod tests {
229229

230230
let cfg = BootstrapConfig {
231231
ob_id: sample_ob_id(),
232-
dump_stmt: Some(dump_stmt.clone()),
232+
dump_stmt: Some(SqlStatementBatch::from(vec![dump_stmt.clone()])),
233233
latest_block: 0,
234234
block_number_threshold: TEST_BLOCK_NUMBER_THRESHOLD,
235235
deployment_block: 1,

0 commit comments

Comments
 (0)