From d67dcb307fc0fb9e06d5fea9740e18a214209812 Mon Sep 17 00:00:00 2001 From: "chandr-andr (Kiselev Aleksandr)" Date: Mon, 27 Jan 2025 00:31:43 +0100 Subject: [PATCH 1/2] Small changes in doc Signed-off-by: chandr-andr (Kiselev Aleksandr) --- docs/components/listener.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/components/listener.md b/docs/components/listener.md index 1ad98787..000067ac 100644 --- a/docs/components/listener.md +++ b/docs/components/listener.md @@ -121,7 +121,7 @@ async def main() -> None: - `channel`: name of the channel to listen. - `callback`: coroutine callback. -Add new callback to the channel, can be called more than 1 times. +Add new callback to the channel, can be called multiple times (before or after `listen`). Callback signature is like this: ```python From f2884d184408ae6715b8f485a31f95e885aae83c Mon Sep 17 00:00:00 2001 From: "chandr-andr (Kiselev Aleksandr)" Date: Sat, 8 Feb 2025 19:05:57 +0100 Subject: [PATCH 2/2] Refactored execute-like methods Signed-off-by: chandr-andr (Kiselev Aleksandr) --- src/common.rs | 110 +--------- src/driver/connection.rs | 356 ++------------------------------- src/driver/connection_pool.rs | 3 +- src/driver/cursor.rs | 29 ++- src/driver/inner_connection.rs | 267 +++++++++++++++++++++++++ src/driver/listener/core.rs | 3 +- src/driver/mod.rs | 1 + src/driver/transaction.rs | 133 ++---------- 8 files changed, 312 insertions(+), 590 deletions(-) create mode 100644 src/driver/inner_connection.rs diff --git a/src/common.rs b/src/common.rs index 8dc70fc3..d0ec15e4 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,13 +1,6 @@ use pyo3::{ types::{PyAnyMethods, PyModule, PyModuleMethods}, - Bound, PyAny, PyResult, Python, -}; - -use crate::{ - driver::connection::PsqlpyConnection, - exceptions::rust_errors::RustPSQLDriverPyResult, - query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, - value_converter::{convert_parameters, PythonDTO, QueryParameter}, + Bound, PyResult, Python, }; /// Add new module to the parent one. @@ -33,104 +26,3 @@ pub fn add_module( )?; Ok(()) } - -pub trait ObjectQueryTrait { - fn psqlpy_query_one( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> impl std::future::Future> + Send; - - fn psqlpy_query( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> impl std::future::Future> + Send; - - fn psqlpy_query_simple( - &self, - querystring: String, - ) -> impl std::future::Future> + Send; -} - -impl ObjectQueryTrait for PsqlpyConnection { - async fn psqlpy_query_one( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> RustPSQLDriverPyResult { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - self.query_one( - &self.prepare_cached(&querystring).await?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - } else { - self.query_one( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - }; - - Ok(PSQLDriverSinglePyQueryResult::new(result)) - } - - async fn psqlpy_query( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> RustPSQLDriverPyResult { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - self.query( - &self.prepare_cached(&querystring).await?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - } else { - self.query( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - }; - - Ok(PSQLDriverPyQueryResult::new(result)) - } - - async fn psqlpy_query_simple(&self, querystring: String) -> RustPSQLDriverPyResult<()> { - self.batch_execute(querystring.as_str()).await - } -} diff --git a/src/driver/connection.rs b/src/driver/connection.rs index f10328c2..2d747225 100644 --- a/src/driver/connection.rs +++ b/src/driver/connection.rs @@ -1,114 +1,24 @@ -use bytes::{Buf, BytesMut}; -use deadpool_postgres::{Object, Pool}; +use bytes::BytesMut; +use deadpool_postgres::Pool; use futures_util::pin_mut; -use postgres_types::ToSql; use pyo3::{buffer::PyBuffer, pyclass, pymethods, Py, PyAny, PyErr, Python}; -use std::{collections::HashSet, sync::Arc, vec}; -use tokio_postgres::{ - binary_copy::BinaryCopyInWriter, Client, CopyInSink, Row, Statement, ToStatement, -}; +use std::{collections::HashSet, sync::Arc}; +use tokio_postgres::binary_copy::BinaryCopyInWriter; use crate::{ exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult}, format_helpers::quote_ident, query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, runtime::tokio_runtime, - value_converter::{convert_parameters, postgres_to_py, PythonDTO, QueryParameter}, }; use super::{ cursor::Cursor, + inner_connection::PsqlpyConnection, transaction::Transaction, transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit}, }; -#[allow(clippy::module_name_repetitions)] -pub enum PsqlpyConnection { - PoolConn(Object), - SingleConn(Client), -} - -impl PsqlpyConnection { - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot prepare statement. - pub async fn prepare_cached(&self, query: &str) -> RustPSQLDriverPyResult { - match self { - PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.prepare_cached(query).await?), - PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.prepare(query).await?), - } - } - - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn query( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> RustPSQLDriverPyResult> - where - T: ?Sized + ToStatement, - { - match self { - PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.query(statement, params).await?), - PsqlpyConnection::SingleConn(sconn) => { - return Ok(sconn.query(statement, params).await?) - } - } - } - - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn batch_execute(&self, query: &str) -> RustPSQLDriverPyResult<()> { - match self { - PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.batch_execute(query).await?), - PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.batch_execute(query).await?), - } - } - - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn query_one( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> RustPSQLDriverPyResult - where - T: ?Sized + ToStatement, - { - match self { - PsqlpyConnection::PoolConn(pconn) => { - return Ok(pconn.query_one(statement, params).await?) - } - PsqlpyConnection::SingleConn(sconn) => { - return Ok(sconn.query_one(statement, params).await?) - } - } - } - - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot execute copy data. - pub async fn copy_in(&self, statement: &T) -> RustPSQLDriverPyResult> - where - T: ?Sized + ToStatement, - U: Buf + 'static + Send, - { - match self { - PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.copy_in(statement).await?), - PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.copy_in(statement).await?), - } - } -} - #[pyclass(subclass)] #[derive(Clone)] pub struct Connection { @@ -213,54 +123,7 @@ impl Connection { let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone()); if let Some(db_client) = db_client { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - db_client - .query( - &db_client - .prepare_cached(&querystring) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - } else { - db_client - .query( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - }; - - return Ok(PSQLDriverPyQueryResult::new(result)); + return db_client.execute(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::ConnectionClosedError) @@ -311,60 +174,9 @@ impl Connection { let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone()); if let Some(db_client) = db_client { - let mut params: Vec> = vec![]; - if let Some(parameters) = parameters { - for vec_of_py_any in parameters { - params.push(convert_parameters(vec_of_py_any)?); - } - } - let prepared = prepared.unwrap_or(true); - - db_client.batch_execute("BEGIN;").await.map_err(|err| { - RustPSQLDriverError::TransactionBeginError(format!( - "Cannot start transaction to run execute_many: {err}" - )) - })?; - for param in params { - let querystring_result = if prepared { - let prepared_stmt = &db_client.prepare_cached(&querystring).await; - if let Err(error) = prepared_stmt { - return Err(RustPSQLDriverError::TransactionExecuteError(format!( - "Cannot prepare statement in execute_many, operation rolled back {error}", - ))); - } - db_client - .query( - &db_client.prepare_cached(&querystring).await?, - ¶m - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - } else { - db_client - .query( - &querystring, - ¶m - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - }; - - if let Err(error) = querystring_result { - db_client.batch_execute("ROLLBACK;").await?; - return Err(RustPSQLDriverError::TransactionExecuteError(format!( - "Error occured in `execute_many` statement, transaction is rolled back: {error}" - ))); - } - } - db_client.batch_execute("COMMIT;").await?; - - return Ok(()); + return db_client + .execute_many(querystring, parameters, prepared) + .await; } Err(RustPSQLDriverError::ConnectionClosedError) @@ -388,54 +200,7 @@ impl Connection { let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone()); if let Some(db_client) = db_client { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - db_client - .query( - &db_client - .prepare_cached(&querystring) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - } else { - db_client - .query( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - }; - - return Ok(PSQLDriverPyQueryResult::new(result)); + return db_client.execute(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::ConnectionClosedError) @@ -465,54 +230,7 @@ impl Connection { let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone()); if let Some(db_client) = db_client { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - db_client - .query_one( - &db_client - .prepare_cached(&querystring) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - } else { - db_client - .query_one( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - }; - - return Ok(PSQLDriverSinglePyQueryResult::new(result)); + return db_client.fetch_row(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::ConnectionClosedError) @@ -539,57 +257,7 @@ impl Connection { let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone()); if let Some(db_client) = db_client { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - db_client - .query_one( - &db_client - .prepare_cached(&querystring) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - } else { - db_client - .query_one( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot execute statement, error - {err}" - )) - })? - }; - - return Python::with_gil(|gil| match result.columns().first() { - Some(first_column) => postgres_to_py(gil, &result, first_column, 0, &None), - None => Ok(gil.None()), - }); + return db_client.fetch_val(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::ConnectionClosedError) diff --git a/src/driver/connection_pool.rs b/src/driver/connection_pool.rs index 4f38407d..c2b3046e 100644 --- a/src/driver/connection_pool.rs +++ b/src/driver/connection_pool.rs @@ -12,7 +12,8 @@ use crate::{ use super::{ common_options::{ConnRecyclingMethod, LoadBalanceHosts, SslMode, TargetSessionAttrs}, - connection::{Connection, PsqlpyConnection}, + connection::Connection, + inner_connection::PsqlpyConnection, listener::core::Listener, utils::{build_connection_config, build_manager, build_tls}, }; diff --git a/src/driver/cursor.rs b/src/driver/cursor.rs index 7368d29a..e5147b89 100644 --- a/src/driver/cursor.rs +++ b/src/driver/cursor.rs @@ -5,13 +5,12 @@ use pyo3::{ }; use crate::{ - common::ObjectQueryTrait, exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult}, query_result::PSQLDriverPyQueryResult, runtime::rustdriver_future, }; -use super::connection::PsqlpyConnection; +use super::inner_connection::PsqlpyConnection; /// Additional implementation for the `Object` type. #[allow(clippy::ref_option)] @@ -55,7 +54,7 @@ impl CursorObjectTrait for PsqlpyConnection { cursor_init_query.push_str(format!(" CURSOR FOR {querystring}").as_str()); - self.psqlpy_query(cursor_init_query, parameters.clone(), *prepared) + self.execute(cursor_init_query, parameters.clone(), *prepared) .await .map_err(|err| { RustPSQLDriverError::CursorStartError(format!("Cannot start cursor, error - {err}")) @@ -77,7 +76,7 @@ impl CursorObjectTrait for PsqlpyConnection { )); } - self.psqlpy_query( + self.execute( format!("CLOSE {cursor_name}"), Option::default(), Some(false), @@ -220,7 +219,7 @@ impl Cursor { rustdriver_future(gil, async move { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH {fetch_number} FROM {cursor_name}"), None, Some(false), @@ -318,7 +317,7 @@ impl Cursor { }; let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH {fetch_number} FROM {cursor_name}"), None, Some(false), @@ -350,7 +349,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query(format!("FETCH NEXT FROM {cursor_name}"), None, Some(false)) + .execute(format!("FETCH NEXT FROM {cursor_name}"), None, Some(false)) .await .map_err(|err| { RustPSQLDriverError::CursorFetchError(format!( @@ -377,7 +376,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query(format!("FETCH PRIOR FROM {cursor_name}"), None, Some(false)) + .execute(format!("FETCH PRIOR FROM {cursor_name}"), None, Some(false)) .await .map_err(|err| { RustPSQLDriverError::CursorFetchError(format!( @@ -404,7 +403,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query(format!("FETCH FIRST FROM {cursor_name}"), None, Some(false)) + .execute(format!("FETCH FIRST FROM {cursor_name}"), None, Some(false)) .await .map_err(|err| { RustPSQLDriverError::CursorFetchError(format!( @@ -431,7 +430,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query(format!("FETCH LAST FROM {cursor_name}"), None, Some(false)) + .execute(format!("FETCH LAST FROM {cursor_name}"), None, Some(false)) .await .map_err(|err| { RustPSQLDriverError::CursorFetchError(format!( @@ -461,7 +460,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH ABSOLUTE {absolute_number} FROM {cursor_name}"), None, Some(false), @@ -495,7 +494,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH RELATIVE {relative_number} FROM {cursor_name}"), None, Some(false), @@ -528,7 +527,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH FORWARD ALL FROM {cursor_name}"), None, Some(false), @@ -562,7 +561,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH BACKWARD {backward_count} FROM {cursor_name}",), None, Some(false), @@ -595,7 +594,7 @@ impl Cursor { if let Some(db_transaction) = db_transaction { let result = db_transaction - .psqlpy_query( + .execute( format!("FETCH BACKWARD ALL FROM {cursor_name}"), None, Some(false), diff --git a/src/driver/inner_connection.rs b/src/driver/inner_connection.rs new file mode 100644 index 00000000..c66006cc --- /dev/null +++ b/src/driver/inner_connection.rs @@ -0,0 +1,267 @@ +use bytes::Buf; +use deadpool_postgres::Object; +use postgres_types::ToSql; +use pyo3::{Py, PyAny, Python}; +use std::vec; +use tokio_postgres::{Client, CopyInSink, Row, Statement, ToStatement}; + +use crate::{ + exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult}, + query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, + value_converter::{convert_parameters, postgres_to_py, PythonDTO, QueryParameter}, +}; + +#[allow(clippy::module_name_repetitions)] +pub enum PsqlpyConnection { + PoolConn(Object), + SingleConn(Client), +} + +impl PsqlpyConnection { + /// Prepare cached statement. + /// + /// # Errors + /// May return Err if cannot prepare statement. + pub async fn prepare_cached(&self, query: &str) -> RustPSQLDriverPyResult { + match self { + PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.prepare_cached(query).await?), + PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.prepare(query).await?), + } + } + + /// Prepare cached statement. + /// + /// # Errors + /// May return Err if cannot execute statement. + pub async fn query( + &self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + ) -> RustPSQLDriverPyResult> + where + T: ?Sized + ToStatement, + { + match self { + PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.query(statement, params).await?), + PsqlpyConnection::SingleConn(sconn) => { + return Ok(sconn.query(statement, params).await?) + } + } + } + + /// Prepare cached statement. + /// + /// # Errors + /// May return Err if cannot execute statement. + pub async fn batch_execute(&self, query: &str) -> RustPSQLDriverPyResult<()> { + match self { + PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.batch_execute(query).await?), + PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.batch_execute(query).await?), + } + } + + /// Prepare cached statement. + /// + /// # Errors + /// May return Err if cannot execute statement. + pub async fn query_one( + &self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + ) -> RustPSQLDriverPyResult + where + T: ?Sized + ToStatement, + { + match self { + PsqlpyConnection::PoolConn(pconn) => { + return Ok(pconn.query_one(statement, params).await?) + } + PsqlpyConnection::SingleConn(sconn) => { + return Ok(sconn.query_one(statement, params).await?) + } + } + } + + pub async fn execute( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> RustPSQLDriverPyResult { + let prepared = prepared.unwrap_or(true); + + let mut params: Vec = vec![]; + if let Some(parameters) = parameters { + params = convert_parameters(parameters)?; + } + + let boxed_params = ¶ms + .iter() + .map(|param| param as &QueryParameter) + .collect::>() + .into_boxed_slice(); + + let result = if prepared { + self.query( + &self.prepare_cached(&querystring).await.map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot prepare statement, error - {err}" + )) + })?, + boxed_params, + ) + .await + .map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot execute statement, error - {err}" + )) + })? + } else { + self.query(&querystring, boxed_params) + .await + .map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot execute statement, error - {err}" + )) + })? + }; + + Ok(PSQLDriverPyQueryResult::new(result)) + } + + pub async fn execute_many( + &self, + querystring: String, + parameters: Option>>, + prepared: Option, + ) -> RustPSQLDriverPyResult<()> { + let prepared = prepared.unwrap_or(true); + + let mut params: Vec> = vec![]; + if let Some(parameters) = parameters { + for vec_of_py_any in parameters { + params.push(convert_parameters(vec_of_py_any)?); + } + } + + for param in params { + let boxed_params = ¶m + .iter() + .map(|param| param as &QueryParameter) + .collect::>() + .into_boxed_slice(); + + let querystring_result = if prepared { + let prepared_stmt = &self.prepare_cached(&querystring).await; + if let Err(error) = prepared_stmt { + return Err(RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot prepare statement in execute_many, operation rolled back {error}", + ))); + } + self.query(&self.prepare_cached(&querystring).await?, boxed_params) + .await + } else { + self.query(&querystring, boxed_params).await + }; + + if let Err(error) = querystring_result { + return Err(RustPSQLDriverError::ConnectionExecuteError(format!( + "Error occured in `execute_many` statement: {error}" + ))); + } + } + + return Ok(()); + } + + pub async fn fetch_row_raw( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> RustPSQLDriverPyResult { + let prepared = prepared.unwrap_or(true); + + let mut params: Vec = vec![]; + if let Some(parameters) = parameters { + params = convert_parameters(parameters)?; + } + + let boxed_params = ¶ms + .iter() + .map(|param| param as &QueryParameter) + .collect::>() + .into_boxed_slice(); + + let result = if prepared { + self.query_one( + &self.prepare_cached(&querystring).await.map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot prepare statement, error - {err}" + )) + })?, + boxed_params, + ) + .await + .map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot execute statement, error - {err}" + )) + })? + } else { + self.query_one(&querystring, boxed_params) + .await + .map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot execute statement, error - {err}" + )) + })? + }; + + return Ok(result); + } + + pub async fn fetch_row( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> RustPSQLDriverPyResult { + let result = self + .fetch_row_raw(querystring, parameters, prepared) + .await?; + + return Ok(PSQLDriverSinglePyQueryResult::new(result)); + } + + pub async fn fetch_val( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> RustPSQLDriverPyResult> { + let result = self + .fetch_row_raw(querystring, parameters, prepared) + .await?; + + return Python::with_gil(|gil| match result.columns().first() { + Some(first_column) => postgres_to_py(gil, &result, first_column, 0, &None), + None => Ok(gil.None()), + }); + } + + /// Prepare cached statement. + /// + /// # Errors + /// May return Err if cannot execute copy data. + pub async fn copy_in(&self, statement: &T) -> RustPSQLDriverPyResult> + where + T: ?Sized + ToStatement, + U: Buf + 'static + Send, + { + match self { + PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.copy_in(statement).await?), + PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.copy_in(statement).await?), + } + } +} diff --git a/src/driver/listener/core.rs b/src/driver/listener/core.rs index c8fd271c..a79cdab6 100644 --- a/src/driver/listener/core.rs +++ b/src/driver/listener/core.rs @@ -14,7 +14,8 @@ use tokio_postgres::{AsyncMessage, Config}; use crate::{ driver::{ common_options::SslMode, - connection::{Connection, PsqlpyConnection}, + connection::Connection, + inner_connection::PsqlpyConnection, utils::{build_tls, is_coroutine_function, ConfiguredTLS}, }, exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult}, diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 578bf2cd..e7827cd5 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -3,6 +3,7 @@ pub mod connection; pub mod connection_pool; pub mod connection_pool_builder; pub mod cursor; +pub mod inner_connection; pub mod listener; pub mod transaction; pub mod transaction_options; diff --git a/src/driver/transaction.rs b/src/driver/transaction.rs index 3fa59e4d..4cc3655a 100644 --- a/src/driver/transaction.rs +++ b/src/driver/transaction.rs @@ -12,15 +12,13 @@ use crate::{ exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult}, format_helpers::quote_ident, query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, - value_converter::{convert_parameters, postgres_to_py, PythonDTO, QueryParameter}, }; use super::{ - connection::PsqlpyConnection, cursor::Cursor, + inner_connection::PsqlpyConnection, transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit}, }; -use crate::common::ObjectQueryTrait; use std::{collections::HashSet, sync::Arc}; #[allow(clippy::module_name_repetitions)] @@ -328,9 +326,7 @@ impl Transaction { }); is_transaction_ready?; if let Some(db_client) = db_client { - return db_client - .psqlpy_query(querystring, parameters, prepared) - .await; + return db_client.execute(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::TransactionClosedError) @@ -384,9 +380,7 @@ impl Transaction { }); is_transaction_ready?; if let Some(db_client) = db_client { - return db_client - .psqlpy_query(querystring, parameters, prepared) - .await; + return db_client.execute(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::TransactionClosedError) @@ -420,36 +414,7 @@ impl Transaction { is_transaction_ready?; if let Some(db_client) = db_client { - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - - let result = if prepared.unwrap_or(true) { - db_client - .query_one( - &db_client.prepare_cached(&querystring).await?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - } else { - db_client - .query_one( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - }; - - return Ok(PSQLDriverSinglePyQueryResult::new(result)); + return db_client.fetch_row(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::TransactionClosedError) @@ -476,41 +441,9 @@ impl Transaction { let self_ = self_.borrow(gil); (self_.check_is_transaction_ready(), self_.db_client.clone()) }); + is_transaction_ready?; if let Some(db_client) = db_client { - is_transaction_ready?; - let mut params: Vec = vec![]; - if let Some(parameters) = parameters { - params = convert_parameters(parameters)?; - } - - let result = if prepared.unwrap_or(true) { - db_client - .query_one( - &db_client.prepare_cached(&querystring).await?, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - } else { - db_client - .query_one( - &querystring, - ¶ms - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await? - }; - - return Python::with_gil(|gil| match result.columns().first() { - Some(first_column) => postgres_to_py(gil, &result, first_column, 0, &None), - None => Ok(gil.None()), - }); + return db_client.fetch_val(querystring, parameters, prepared).await; } Err(RustPSQLDriverError::TransactionClosedError) @@ -537,51 +470,11 @@ impl Transaction { (self_.check_is_transaction_ready(), self_.db_client.clone()) }); + is_transaction_ready?; if let Some(db_client) = db_client { - is_transaction_ready?; - - let mut params: Vec> = vec![]; - if let Some(parameters) = parameters { - for vec_of_py_any in parameters { - params.push(convert_parameters(vec_of_py_any)?); - } - } - let prepared = prepared.unwrap_or(true); - - for param in params { - let is_query_result_ok = if prepared { - let prepared_stmt = &db_client.prepare_cached(&querystring).await; - if let Err(error) = prepared_stmt { - return Err(RustPSQLDriverError::TransactionExecuteError(format!( - "Cannot prepare statement in execute_many, operation rolled back {error}", - ))); - } - db_client - .query( - &db_client.prepare_cached(&querystring).await?, - ¶m - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - } else { - db_client - .query( - &querystring, - ¶m - .iter() - .map(|param| param as &QueryParameter) - .collect::>() - .into_boxed_slice(), - ) - .await - }; - is_query_result_ok?; - } - - return Ok(()); + return db_client + .execute_many(querystring, parameters, prepared) + .await; } Err(RustPSQLDriverError::TransactionClosedError) @@ -804,9 +697,9 @@ impl Transaction { (self_.check_is_transaction_ready(), self_.db_client.clone()) }); - if let Some(db_client) = db_client { - is_transaction_ready?; + is_transaction_ready?; + if let Some(db_client) = db_client { let mut futures = vec![]; if let Some(queries) = queries { let gil_result = pyo3::Python::with_gil(|gil| -> PyResult<()> { @@ -822,7 +715,7 @@ impl Transaction { Ok(param) => Some(param.into()), Err(_) => None, }; - futures.push(db_client.psqlpy_query(querystring, params, prepared)); + futures.push(db_client.execute(querystring, params, prepared)); } Ok(()) });