Skip to content

Commit 23aeb66

Browse files
committed
Prepared psqlpy for OTLP
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent 2e478e2 commit 23aeb66

File tree

6 files changed

+257
-25
lines changed

6 files changed

+257
-25
lines changed

python/psqlpy/_internal/__init__.pyi

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,16 @@ class Cursor:
288288
It can be used as an asynchronous iterator.
289289
"""
290290

291+
cursor_name: str
292+
querystring: str
293+
parameters: Sequence[Any]
294+
prepared: bool | None
295+
conn_dbname: str | None
296+
user: str | None
297+
host_addrs: list[str]
298+
hosts: list[str]
299+
ports: list[int]
300+
291301
def __aiter__(self: Self) -> Self: ...
292302
async def __anext__(self: Self) -> QueryResult: ...
293303
async def __aenter__(self: Self) -> Self: ...
@@ -424,6 +434,12 @@ class Transaction:
424434
`.transaction()`.
425435
"""
426436

437+
conn_dbname: str | None
438+
user: str | None
439+
host_addrs: list[str]
440+
hosts: list[str]
441+
ports: list[int]
442+
427443
async def __aenter__(self: Self) -> Self: ...
428444
async def __aexit__(
429445
self: Self,
@@ -874,6 +890,12 @@ class Connection:
874890
It can be created only from connection pool.
875891
"""
876892

893+
conn_dbname: str | None
894+
user: str | None
895+
host_addrs: list[str]
896+
hosts: list[str]
897+
ports: list[int]
898+
877899
async def __aenter__(self: Self) -> Self: ...
878900
async def __aexit__(
879901
self: Self,

src/driver/connection.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use bytes::BytesMut;
22
use deadpool_postgres::Pool;
33
use futures_util::pin_mut;
44
use pyo3::{buffer::PyBuffer, pyclass, pymethods, Py, PyAny, PyErr, Python};
5-
use std::{collections::HashSet, sync::Arc};
6-
use tokio_postgres::binary_copy::BinaryCopyInWriter;
5+
use std::{collections::HashSet, net::IpAddr, sync::Arc};
6+
use tokio_postgres::{binary_copy::BinaryCopyInWriter, config::Host, Config};
77

88
use crate::{
99
exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult},
@@ -24,12 +24,21 @@ use super::{
2424
pub struct Connection {
2525
db_client: Option<Arc<PsqlpyConnection>>,
2626
db_pool: Option<Pool>,
27+
pg_config: Arc<Config>,
2728
}
2829

2930
impl Connection {
3031
#[must_use]
31-
pub fn new(db_client: Option<Arc<PsqlpyConnection>>, db_pool: Option<Pool>) -> Self {
32-
Connection { db_client, db_pool }
32+
pub fn new(
33+
db_client: Option<Arc<PsqlpyConnection>>,
34+
db_pool: Option<Pool>,
35+
pg_config: Arc<Config>,
36+
) -> Self {
37+
Connection {
38+
db_client,
39+
db_pool,
40+
pg_config,
41+
}
3342
}
3443

3544
#[must_use]
@@ -45,12 +54,70 @@ impl Connection {
4554

4655
impl Default for Connection {
4756
fn default() -> Self {
48-
Connection::new(None, None)
57+
Connection::new(None, None, Arc::new(Config::default()))
4958
}
5059
}
5160

5261
#[pymethods]
5362
impl Connection {
63+
#[getter]
64+
fn conn_dbname(&self) -> Option<&str> {
65+
self.pg_config.get_dbname()
66+
}
67+
68+
#[getter]
69+
fn user(&self) -> Option<&str> {
70+
self.pg_config.get_user()
71+
}
72+
73+
#[getter]
74+
fn host_addrs(&self) -> Vec<String> {
75+
let mut host_addrs_vec = vec![];
76+
77+
let host_addrs = self.pg_config.get_hostaddrs();
78+
for ip_addr in host_addrs {
79+
match ip_addr {
80+
IpAddr::V4(ipv4) => {
81+
host_addrs_vec.push(ipv4.to_string());
82+
}
83+
IpAddr::V6(ipv6) => {
84+
host_addrs_vec.push(ipv6.to_string());
85+
}
86+
}
87+
}
88+
89+
host_addrs_vec
90+
}
91+
92+
#[getter]
93+
fn hosts(&self) -> Vec<String> {
94+
let mut hosts_vec = vec![];
95+
96+
let hosts = self.pg_config.get_hosts();
97+
for host in hosts {
98+
match host {
99+
Host::Tcp(host) => {
100+
hosts_vec.push(host.to_string());
101+
}
102+
Host::Unix(host) => {
103+
hosts_vec.push(host.display().to_string());
104+
}
105+
}
106+
}
107+
108+
hosts_vec
109+
}
110+
111+
#[getter]
112+
fn ports(&self) -> Vec<&u16> {
113+
return self.pg_config.get_ports().iter().collect::<Vec<&u16>>();
114+
}
115+
116+
#[getter]
117+
fn options(&self) -> Option<&str> {
118+
return self.pg_config.get_options();
119+
}
120+
54121
async fn __aenter__<'a>(self_: Py<Self>) -> RustPSQLDriverPyResult<Py<Self>> {
55122
let (db_client, db_pool) = pyo3::Python::with_gil(|gil| {
56123
let self_ = self_.borrow(gil);
@@ -283,6 +350,7 @@ impl Connection {
283350
if let Some(db_client) = &self.db_client {
284351
return Ok(Transaction::new(
285352
db_client.clone(),
353+
self.pg_config.clone(),
286354
false,
287355
false,
288356
isolation_level,
@@ -318,6 +386,7 @@ impl Connection {
318386
if let Some(db_client) = &self.db_client {
319387
return Ok(Cursor::new(
320388
db_client.clone(),
389+
self.pg_config.clone(),
321390
querystring,
322391
parameters,
323392
"cur_name".into(),

src/driver/connection_pool.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ pub fn connect(
139139
let pool = db_pool_builder.build()?;
140140

141141
Ok(ConnectionPool {
142-
pool,
143-
pg_config,
144-
ca_file,
145-
ssl_mode,
142+
pool: pool,
143+
pg_config: Arc::new(pg_config),
144+
ca_file: ca_file,
145+
ssl_mode: ssl_mode,
146146
})
147147
}
148148

@@ -208,7 +208,7 @@ impl ConnectionPoolStatus {
208208
#[pyclass(subclass)]
209209
pub struct ConnectionPool {
210210
pool: Pool,
211-
pg_config: Config,
211+
pg_config: Arc<Config>,
212212
ca_file: Option<String>,
213213
ssl_mode: Option<SslMode>,
214214
}
@@ -222,10 +222,10 @@ impl ConnectionPool {
222222
ssl_mode: Option<SslMode>,
223223
) -> Self {
224224
ConnectionPool {
225-
pool,
226-
pg_config,
227-
ca_file,
228-
ssl_mode,
225+
pool: pool,
226+
pg_config: Arc::new(pg_config),
227+
ca_file: ca_file,
228+
ssl_mode: ssl_mode,
229229
}
230230
}
231231
}
@@ -499,7 +499,7 @@ impl ConnectionPool {
499499

500500
#[must_use]
501501
pub fn acquire(&self) -> Connection {
502-
Connection::new(None, Some(self.pool.clone()))
502+
Connection::new(None, Some(self.pool.clone()), self.pg_config.clone())
503503
}
504504

505505
#[must_use]
@@ -522,7 +522,10 @@ impl ConnectionPool {
522522
/// # Errors
523523
/// May return Err Result if cannot get new connection from the pool.
524524
pub async fn connection(self_: pyo3::Py<Self>) -> RustPSQLDriverPyResult<Connection> {
525-
let db_pool = pyo3::Python::with_gil(|gil| self_.borrow(gil).pool.clone());
525+
let (db_pool, pg_config) = pyo3::Python::with_gil(|gil| {
526+
let slf = self_.borrow(gil);
527+
(slf.pool.clone(), slf.pg_config.clone())
528+
});
526529
let db_connection = tokio_runtime()
527530
.spawn(async move {
528531
Ok::<deadpool_postgres::Object, RustPSQLDriverError>(db_pool.get().await?)
@@ -532,6 +535,7 @@ impl ConnectionPool {
532535
Ok(Connection::new(
533536
Some(Arc::new(PsqlpyConnection::PoolConn(db_connection))),
534537
None,
538+
pg_config,
535539
))
536540
}
537541

src/driver/cursor.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use std::sync::Arc;
1+
use std::{net::IpAddr, sync::Arc};
22

33
use pyo3::{
44
exceptions::PyStopAsyncIteration, pyclass, pymethods, Py, PyAny, PyErr, PyObject, Python,
55
};
6+
use tokio_postgres::{config::Host, Config};
67

78
use crate::{
89
exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult},
@@ -90,6 +91,7 @@ impl CursorObjectTrait for PsqlpyConnection {
9091
#[pyclass(subclass)]
9192
pub struct Cursor {
9293
db_transaction: Option<Arc<PsqlpyConnection>>,
94+
pg_config: Arc<Config>,
9395
querystring: String,
9496
parameters: Option<Py<PyAny>>,
9597
cursor_name: String,
@@ -104,6 +106,7 @@ impl Cursor {
104106
#[must_use]
105107
pub fn new(
106108
db_transaction: Arc<PsqlpyConnection>,
109+
pg_config: Arc<Config>,
107110
querystring: String,
108111
parameters: Option<Py<PyAny>>,
109112
cursor_name: String,
@@ -113,6 +116,7 @@ impl Cursor {
113116
) -> Self {
114117
Cursor {
115118
db_transaction: Some(db_transaction),
119+
pg_config,
116120
querystring,
117121
parameters,
118122
cursor_name,
@@ -127,6 +131,79 @@ impl Cursor {
127131

128132
#[pymethods]
129133
impl Cursor {
134+
#[getter]
135+
fn conn_dbname(&self) -> Option<&str> {
136+
self.pg_config.get_dbname()
137+
}
138+
139+
#[getter]
140+
fn user(&self) -> Option<&str> {
141+
self.pg_config.get_user()
142+
}
143+
144+
#[getter]
145+
fn host_addrs(&self) -> Vec<String> {
146+
let mut host_addrs_vec = vec![];
147+
148+
let host_addrs = self.pg_config.get_hostaddrs();
149+
for ip_addr in host_addrs {
150+
match ip_addr {
151+
IpAddr::V4(ipv4) => {
152+
host_addrs_vec.push(ipv4.to_string());
153+
}
154+
IpAddr::V6(ipv6) => {
155+
host_addrs_vec.push(ipv6.to_string());
156+
}
157+
}
158+
}
159+
160+
host_addrs_vec
161+
}
162+
163+
#[getter]
164+
fn hosts(&self) -> Vec<String> {
165+
let mut hosts_vec = vec![];
166+
167+
let hosts = self.pg_config.get_hosts();
168+
for host in hosts {
169+
match host {
170+
Host::Tcp(host) => {
171+
hosts_vec.push(host.to_string());
172+
}
173+
Host::Unix(host) => {
174+
hosts_vec.push(host.display().to_string());
175+
}
176+
}
177+
}
178+
179+
hosts_vec
180+
}
181+
182+
#[getter]
183+
fn ports(&self) -> Vec<&u16> {
184+
return self.pg_config.get_ports().iter().collect::<Vec<&u16>>();
185+
}
186+
187+
#[getter]
188+
fn cursor_name(&self) -> String {
189+
return self.cursor_name.clone();
190+
}
191+
192+
#[getter]
193+
fn querystring(&self) -> String {
194+
return self.querystring.clone();
195+
}
196+
197+
#[getter]
198+
fn parameters(&self) -> Option<Py<PyAny>> {
199+
return self.parameters.clone();
200+
}
201+
202+
#[getter]
203+
fn prepared(&self) -> Option<bool> {
204+
return self.prepared.clone();
205+
}
206+
130207
#[must_use]
131208
fn __aiter__(slf: Py<Self>) -> Py<Self> {
132209
slf

src/driver/listener/core.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use super::structs::{
2828

2929
#[pyclass]
3030
pub struct Listener {
31-
pg_config: Config,
31+
pg_config: Arc<Config>,
3232
ca_file: Option<String>,
3333
ssl_mode: Option<SslMode>,
3434
channel_callbacks: Arc<RwLock<ChannelCallbacks>>,
@@ -42,14 +42,14 @@ pub struct Listener {
4242

4343
impl Listener {
4444
#[must_use]
45-
pub fn new(pg_config: Config, ca_file: Option<String>, ssl_mode: Option<SslMode>) -> Self {
45+
pub fn new(pg_config: Arc<Config>, ca_file: Option<String>, ssl_mode: Option<SslMode>) -> Self {
4646
Listener {
47-
pg_config,
47+
pg_config: pg_config.clone(),
4848
ca_file,
4949
ssl_mode,
5050
channel_callbacks: Arc::default(),
5151
listen_abort_handler: Option::default(),
52-
connection: Connection::new(None, None),
52+
connection: Connection::new(None, None, pg_config.clone()),
5353
receiver: Option::default(),
5454
listen_query: Arc::default(),
5555
is_listened: Arc::new(RwLock::new(false)),
@@ -218,8 +218,11 @@ impl Listener {
218218
tokio_runtime().spawn(connection);
219219

220220
self.receiver = Some(Arc::new(RwLock::new(receiver)));
221-
self.connection =
222-
Connection::new(Some(Arc::new(PsqlpyConnection::SingleConn(client))), None);
221+
self.connection = Connection::new(
222+
Some(Arc::new(PsqlpyConnection::SingleConn(client))),
223+
None,
224+
self.pg_config.clone(),
225+
);
223226

224227
self.is_started = true;
225228

0 commit comments

Comments
 (0)