Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions src-tauri/src/drivers/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod types;

mod explain;
mod helpers;
mod multi_result;
mod stmt_classify;

#[cfg(test)]
Expand Down Expand Up @@ -1172,6 +1173,7 @@ async fn exec_on_mysql_conn(
affected_rows: exec_result.rows_affected(),
truncated: false,
pagination: None,
additional_results: None,
});
}

Expand All @@ -1192,6 +1194,7 @@ async fn exec_on_mysql_conn(
affected_rows: exec_result.rows_affected(),
truncated: false,
pagination: None,
additional_results: None,
});
}

Expand All @@ -1218,33 +1221,38 @@ async fn exec_on_mysql_conn(
final_query = query.to_string();
}

let mut columns: Vec<String> = Vec::new();
let mut json_rows = Vec::new();
// A single statement may stream back several result sets (e.g. a `CALL`
// whose procedure body holds multiple `SELECT`s), so `fetch_many` is used
// instead of `fetch`: it interleaves rows with one `Either::Left`
// terminator per result set, which the collector folds into discrete sets.
let mut collector = multi_result::ResultSetCollector::new(manual_limit);

// Scope the stream so `conn` borrow is released before returning
{
use futures::stream::StreamExt;
let mut rows_stream = if text.enabled {
use sqlx::Executor;
(&mut *conn).fetch(sqlx::raw_sql(&final_query))
use sqlx::Executor;
let mut event_stream = if text.enabled {
(&mut *conn).fetch_many(sqlx::raw_sql(&final_query))
} else {
sqlx::query(&final_query).fetch(&mut *conn)
(&mut *conn).fetch_many(sqlx::query(&final_query))
};

while let Some(result) = rows_stream.next().await {
while let Some(result) = event_stream.next().await {
match result {
Ok(row) => {
// Initialize columns from the first row
if columns.is_empty() {
columns = row.columns().iter().map(|c| c.name().to_string()).collect();
Ok(sqlx::Either::Left(_)) => collector.end_result_set(),
Ok(sqlx::Either::Right(row)) => {
// Initialize columns from the first row of each result set
if collector.needs_columns() {
collector.set_columns(
row.columns().iter().map(|c| c.name().to_string()).collect(),
);
}

// Check limit (only if manual_limit is set)
if let Some(l) = manual_limit {
if json_rows.len() >= l as usize {
truncated = true;
break;
}
// Past the row cap the row is still consumed (the stream
// must drain to reach later result sets) but not decoded.
if collector.at_limit() {
collector.note_overflow_row();
continue;
}

// Map row using type extraction function
Expand All @@ -1253,12 +1261,41 @@ async fn exec_on_mysql_conn(
let val = extract_value(&row, i, None);
json_row.push(val);
}
json_rows.push(json_row);
collector.push_row(json_row);
}
Err(e) => return Err(e.to_string()),
}
}
} // rows_stream dropped here — conn borrow released
} // event_stream dropped here — conn borrow released

let mut result_sets = collector.finish();
let primary = if result_sets.is_empty() {
multi_result::ResultSetData::default()
} else {
result_sets.remove(0)
};
let columns = primary.columns;
let mut json_rows = primary.rows;
if primary.truncated {
truncated = true;
}
let additional_results = if result_sets.is_empty() {
None
} else {
Some(
result_sets
.into_iter()
.map(|set| QueryResult {
columns: set.columns,
rows: set.rows,
affected_rows: 0,
truncated: set.truncated,
pagination: None,
additional_results: None,
})
.collect(),
)
};

// Apply LIMIT +1 result: if we got page_size+1 rows, has_more=true
if let Some(ref mut p) = pagination {
Expand All @@ -1276,6 +1313,7 @@ async fn exec_on_mysql_conn(
affected_rows: 0,
truncated,
pagination,
additional_results,
})
}

Expand Down
86 changes: 86 additions & 0 deletions src-tauri/src/drivers/mysql/multi_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! Accumulates the result sets streamed back by a single MySQL statement.
//!
//! A statement is usually one result set, but a `CALL` to a stored procedure
//! may return several (one per `SELECT` in its body). `sqlx`'s `fetch_many`
//! surfaces them as a flat stream of rows interleaved with per-result-set
//! terminators; this collector folds that stream back into discrete sets.

/// One materialized result set: column names plus JSON-encoded rows.
#[derive(Debug, Default)]
pub struct ResultSetData {
pub columns: Vec<String>,
pub rows: Vec<Vec<serde_json::Value>>,
pub truncated: bool,
}

/// Folds a `fetch_many`-style event stream (rows + result-set terminators)
/// into a list of [`ResultSetData`], applying an optional per-set row cap.
///
/// Result sets that produced no rows are dropped: without rows `sqlx` exposes
/// no column metadata, and the trailing `OK` packet of a `CALL` arrives as an
/// empty set too, so an empty set is indistinguishable from "no result set".
/// This mirrors the pre-existing single-set behaviour where a rowless query
/// yielded empty `columns` / `rows`.
pub struct ResultSetCollector {
limit: Option<u32>,
done: Vec<ResultSetData>,
current: ResultSetData,
}

impl ResultSetCollector {
pub fn new(limit: Option<u32>) -> Self {
Self {
limit,
done: Vec::new(),
current: ResultSetData::default(),
}
}

/// True until the first row of the current result set has provided
/// column metadata.
pub fn needs_columns(&self) -> bool {
self.current.columns.is_empty()
}

pub fn set_columns(&mut self, columns: Vec<String>) {
self.current.columns = columns;
}

/// True when the current result set already holds `limit` rows. Callers
/// should still consume (and discard) the remaining rows of the set so
/// that any following result sets are reached.
pub fn at_limit(&self) -> bool {
matches!(self.limit, Some(l) if self.current.rows.len() >= l as usize)
}

/// Appends a row to the current result set, or marks the set as
/// truncated when the row cap has been reached.
pub fn push_row(&mut self, row: Vec<serde_json::Value>) {
if self.at_limit() {
self.current.truncated = true;
} else {
self.current.rows.push(row);
}
}

/// Records that a row beyond the cap was consumed from the wire without
/// being decoded, marking the current result set as truncated.
pub fn note_overflow_row(&mut self) {
self.current.truncated = true;
}

/// Closes the current result set (a `fetch_many` `Left` terminator).
pub fn end_result_set(&mut self) {
if !self.current.rows.is_empty() {
self.done.push(std::mem::take(&mut self.current));
} else {
self.current = ResultSetData::default();
}
}

/// Returns all collected result sets, closing any still-open one.
pub fn finish(mut self) -> Vec<ResultSetData> {
self.end_result_set();
self.done
}
}
115 changes: 115 additions & 0 deletions src-tauri/src/drivers/mysql/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,3 +813,118 @@ mod build_mysql_pk_where_tests {
assert!(build_mysql_pk_where(&pk_map).is_err());
}
}

mod multi_result_collector {
use super::super::multi_result::ResultSetCollector;
use serde_json::json;

fn row(v: i64) -> Vec<serde_json::Value> {
vec![json!(v)]
}

#[test]
fn single_result_set_is_collected() {
let mut c = ResultSetCollector::new(None);
assert!(c.needs_columns());
c.set_columns(vec!["id".to_string()]);
assert!(!c.needs_columns());
c.push_row(row(1));
c.push_row(row(2));
c.end_result_set();

let sets = c.finish();
assert_eq!(sets.len(), 1);
assert_eq!(sets[0].columns, vec!["id".to_string()]);
assert_eq!(sets[0].rows.len(), 2);
assert!(!sets[0].truncated);
}

#[test]
fn multiple_result_sets_are_split_at_terminators() {
let mut c = ResultSetCollector::new(None);
for set in 0..3 {
assert!(c.needs_columns());
c.set_columns(vec![format!("col{set}")]);
c.push_row(row(set));
c.end_result_set();
}

let sets = c.finish();
assert_eq!(sets.len(), 3);
assert_eq!(sets[1].columns, vec!["col1".to_string()]);
assert_eq!(sets[2].rows, vec![row(2)]);
}

#[test]
fn empty_result_sets_are_dropped() {
// A CALL emits a trailing OK packet that surfaces as an empty set;
// rowless SELECTs are indistinguishable from it and dropped too.
let mut c = ResultSetCollector::new(None);
c.set_columns(vec!["id".to_string()]);
c.push_row(row(1));
c.end_result_set();
c.end_result_set();
c.end_result_set();

let sets = c.finish();
assert_eq!(sets.len(), 1);
}

#[test]
fn no_rows_at_all_yields_no_sets() {
let mut c = ResultSetCollector::new(None);
c.end_result_set();
assert!(c.finish().is_empty());
}

#[test]
fn per_set_limit_truncates_each_set_independently() {
let mut c = ResultSetCollector::new(Some(2));
c.set_columns(vec!["id".to_string()]);
c.push_row(row(1));
assert!(!c.at_limit());
c.push_row(row(2));
assert!(c.at_limit());
c.note_overflow_row();
c.end_result_set();

// The cap applies per result set: the next set starts fresh.
c.set_columns(vec!["id".to_string()]);
c.push_row(row(3));
assert!(!c.at_limit());
c.end_result_set();

let sets = c.finish();
assert_eq!(sets.len(), 2);
assert_eq!(sets[0].rows.len(), 2);
assert!(sets[0].truncated);
assert_eq!(sets[1].rows.len(), 1);
assert!(!sets[1].truncated);
}

#[test]
fn push_row_beyond_limit_drops_row_and_marks_truncated() {
let mut c = ResultSetCollector::new(Some(1));
c.set_columns(vec!["id".to_string()]);
c.push_row(row(1));
c.push_row(row(2));
c.end_result_set();

let sets = c.finish();
assert_eq!(sets[0].rows, vec![row(1)]);
assert!(sets[0].truncated);
}

#[test]
fn finish_flushes_an_unterminated_set() {
// Defensive: a stream that ends without a final terminator must not
// lose the in-flight rows.
let mut c = ResultSetCollector::new(None);
c.set_columns(vec!["id".to_string()]);
c.push_row(row(1));

let sets = c.finish();
assert_eq!(sets.len(), 1);
assert_eq!(sets[0].rows.len(), 1);
}
}
2 changes: 2 additions & 0 deletions src-tauri/src/drivers/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ async fn exec_on_pg_client(
affected_rows: affected,
truncated: false,
pagination: None,
additional_results: None,
});
}

Expand Down Expand Up @@ -920,6 +921,7 @@ async fn exec_on_pg_client(
affected_rows: 0,
truncated,
pagination,
additional_results: None,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src-tauri/src/drivers/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ async fn exec_on_sqlite_conn(
affected_rows: exec_result.rows_affected(),
truncated: false,
pagination: None,
additional_results: None,
});
}

Expand Down Expand Up @@ -605,6 +606,7 @@ async fn exec_on_sqlite_conn(
affected_rows: 0,
truncated,
pagination,
additional_results: None,
})
}

Expand Down
6 changes: 6 additions & 0 deletions src-tauri/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ pub struct QueryResult {
#[serde(default)]
pub truncated: bool,
pub pagination: Option<Pagination>,
/// Extra result sets produced by a single statement beyond the first one,
/// e.g. a MySQL `CALL` to a stored procedure containing multiple `SELECT`s.
/// The first result set stays in `columns` / `rows` so consumers unaware
/// of multi-result statements keep working unchanged.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub additional_results: Option<Vec<QueryResult>>,
}

/// One statement's outcome within an `execute_batch` call. Exactly one of
Expand Down
2 changes: 1 addition & 1 deletion src/i18n/locales/de.json
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@
"selectTypeFirst": "Zuerst Kontext/Namespace/Typ auswählen",
"useK8s": "Kubernetes-Port-Forward verwenden",
"useK8sConnection": "Gespeicherte Verbindung",
"advanced": "Erweitert",
"startupScript": "Startskript",
"startupScriptDescription": "SQL, das bei jeder neuen Verbindung zu dieser Datenquelle ausgeführt wird. Verwenden Sie es für Sitzungseinstellungen wie SET / set_config (z. B. zum Umgehen von RLS). Trennen Sie Anweisungen mit Semikolons.",
"startupScriptPlaceholder": "SELECT set_config('app.bypass_rls', 'on', false);",
Expand Down Expand Up @@ -1091,6 +1090,7 @@
"viewTabs": "Tab-Ansicht",
"viewStacked": "Gestapelte Ansicht",
"queryPrefix": "Abfrage",
"resultSetPrefix": "Ergebnis",
"results": "Ergebnisse",
"collapseAll": "Alle einklappen",
"expandAll": "Alle ausklappen",
Expand Down
Loading