From 8af24c1dee1d46ee4c0eef8fad2adcb9c155c128 Mon Sep 17 00:00:00 2001 From: Martin Garton Date: Mon, 1 Dec 2025 08:32:40 +0000 Subject: [PATCH 1/3] Improve `pg_get_constraintdef` UDF `pg_get_constraintdef` can optionally take a boolean as a second parameter, so update the UDF to support this. --- datafusion-pg-catalog/src/pg_catalog.rs | 68 ++++++++++++++++++------- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/datafusion-pg-catalog/src/pg_catalog.rs b/datafusion-pg-catalog/src/pg_catalog.rs index 195c440..27c895a 100644 --- a/datafusion-pg-catalog/src/pg_catalog.rs +++ b/datafusion-pg-catalog/src/pg_catalog.rs @@ -14,7 +14,9 @@ use datafusion::catalog::{MemTable, SchemaProvider, TableFunctionImpl}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; -use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; use datafusion::physical_plan::streaming::PartitionStream; use datafusion::prelude::{create_udf, Expr, SessionContext}; use postgres_types::Oid; @@ -1348,26 +1350,58 @@ pub fn create_pg_stat_get_numscans() -> ScalarUDF { } pub fn create_pg_get_constraintdef() -> ScalarUDF { - let func = move |args: &[ColumnarValue]| { - let args = ColumnarValue::values_to_arrays(args)?; - let oids = &args[0].as_primitive::(); + #[derive(Debug, PartialEq, Eq, Hash)] + struct GetConstraintDefUDF { + signature: Signature, + } - let mut builder = StringBuilder::new(); - for _ in 0..oids.len() { - builder.append_value(""); + impl GetConstraintDefUDF { + fn new() -> Self { + let type_signature = TypeSignature::OneOf(vec![ + TypeSignature::Exact(vec![DataType::Int32]), + TypeSignature::Exact(vec![DataType::Int32, DataType::Boolean]), + ]); + + let signature = Signature::new(type_signature, Volatility::Stable); + GetConstraintDefUDF { signature } } + } - let array: ArrayRef = Arc::new(builder.finish()); - Ok(ColumnarValue::Array(array)) - }; + impl ScalarUDFImpl for GetConstraintDefUDF { + fn as_any(&self) -> &dyn std::any::Any { + self + } - create_udf( - "pg_get_constraintdef", - vec![DataType::Int32], - DataType::Utf8, - Volatility::Stable, - Arc::new(func), - ) + fn name(&self) -> &str { + "pg_get_constraintdef" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args( + &self, + args: datafusion::logical_expr::ScalarFunctionArgs, + ) -> Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let oids = &args[0].as_primitive::(); + + let mut builder = StringBuilder::new(); + for _ in 0..oids.len() { + builder.append_value(""); + } + + let array: ArrayRef = Arc::new(builder.finish()); + Ok(ColumnarValue::Array(array)) + } + } + + GetConstraintDefUDF::new().into() } /// Install pg_catalog and postgres UDFs to current `SessionContext` From 3ba7bd994c57f93bf11fda71a93915100bb1da31 Mon Sep 17 00:00:00 2001 From: Martin Garton Date: Mon, 1 Dec 2025 08:40:31 +0000 Subject: [PATCH 2/3] Implement `pg_partition_ancestors` Add a UDF for `pg_partition_ancestors` --- datafusion-pg-catalog/src/pg_catalog.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/datafusion-pg-catalog/src/pg_catalog.rs b/datafusion-pg-catalog/src/pg_catalog.rs index 27c895a..88651d4 100644 --- a/datafusion-pg-catalog/src/pg_catalog.rs +++ b/datafusion-pg-catalog/src/pg_catalog.rs @@ -1404,6 +1404,27 @@ pub fn create_pg_get_constraintdef() -> ScalarUDF { GetConstraintDefUDF::new().into() } +pub fn create_pg_get_partition_ancestors_udf() -> ScalarUDF { + let func = move |args: &[ColumnarValue]| { + let args = ColumnarValue::values_to_arrays(args)?; + let string_array = args[0].as_string::(); + + let mut builder = StringBuilder::new(); + string_array.iter().for_each(|i| builder.append_option(i)); + let array: ArrayRef = Arc::new(builder.finish()); + + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "pg_partition_ancestors", + vec![DataType::Utf8], + DataType::Utf8, + Volatility::Stable, + Arc::new(func), + ) +} + /// Install pg_catalog and postgres UDFs to current `SessionContext` pub fn setup_pg_catalog

( session_context: &SessionContext, @@ -1459,6 +1480,7 @@ where session_context.register_udf(create_pg_total_relation_size_udf()); session_context.register_udf(create_pg_stat_get_numscans()); session_context.register_udf(create_pg_get_constraintdef()); + session_context.register_udf(create_pg_get_partition_ancestors_udf()); Ok(()) } From 0c0db43498c2f894b2148a2aaeac782e622aaabe Mon Sep 17 00:00:00 2001 From: Martin Garton Date: Mon, 1 Dec 2025 08:56:11 +0000 Subject: [PATCH 3/3] Add tests for describe table in psql Add test for describing an example table (`\d customer`) to ensure we handle it properly. Most of the queries already worked fine (only 2 failed) but I added them all for completeness. --- datafusion-postgres/tests/psql.rs | 104 ++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/datafusion-postgres/tests/psql.rs b/datafusion-postgres/tests/psql.rs index e960757..1f23561 100644 --- a/datafusion-postgres/tests/psql.rs +++ b/datafusion-postgres/tests/psql.rs @@ -104,6 +104,110 @@ const PSQL_QUERIES: &[&str] = &[ CASE WHEN pg_catalog.array_length(d.datacl, 1) = 0 THEN '(none)' ELSE pg_catalog.array_to_string(d.datacl, E'\n') END AS "Access privileges" FROM pg_catalog.pg_database d ORDER BY 1;"#, + + // Queries from describing a table, for example `\d customer` + + r#"SELECT c.oid, + n.nspname, + c.relname + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname OPERATOR(pg_catalog.~) '^(customer)$' COLLATE pg_catalog.default + AND pg_catalog.pg_table_is_visible(c.oid) + ORDER BY 2, 3;"#, + + r#"SELECT a.attname, + pg_catalog.format_type(a.atttypid, a.atttypmod), + (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid, true) + FROM pg_catalog.pg_attrdef d + WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef), + a.attnotnull, + (SELECT c.collname FROM pg_catalog.pg_collation c, pg_catalog.pg_type t + WHERE c.oid = a.attcollation AND t.oid = a.atttypid AND a.attcollation <> t.typcollation) AS attcollation, + a.attidentity, + a.attgenerated + FROM pg_catalog.pg_attribute a + WHERE a.attrelid = '16417' AND a.attnum > 0 AND NOT a.attisdropped + ORDER BY a.attnum;"#, + + + r#"SELECT true as sametable, conname, + pg_catalog.pg_get_constraintdef(r.oid, true) as condef, + conrelid::pg_catalog.regclass AS ontable + FROM pg_catalog.pg_constraint r + WHERE r.conrelid = '16417' AND r.contype = 'f' + AND conparentid = 0 + ORDER BY conname;"#, + + r#"SELECT conname, conrelid::pg_catalog.regclass AS ontable, + pg_catalog.pg_get_constraintdef(oid, true) AS condef + FROM pg_catalog.pg_constraint c + WHERE confrelid IN (SELECT pg_catalog.pg_partition_ancestors('16417') + UNION ALL VALUES ('16417'::pg_catalog.regclass)) + AND contype = 'f' AND conparentid = 0 + ORDER BY conname;"#, + + r#"SELECT pol.polname, pol.polpermissive, + CASE WHEN pol.polroles = '{0}' THEN NULL ELSE pg_catalog.array_to_string(array(select rolname from pg_catalog.pg_roles where oid = any (pol.polroles) order by 1),',') END, + pg_catalog.pg_get_expr(pol.polqual, pol.polrelid), + pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid), + CASE pol.polcmd + WHEN 'r' THEN 'SELECT' + WHEN 'a' THEN 'INSERT' + WHEN 'w' THEN 'UPDATE' + WHEN 'd' THEN 'DELETE' + END AS cmd + FROM pg_catalog.pg_policy pol + WHERE pol.polrelid = '16417' ORDER BY 1;"#, + + r#"SELECT oid, stxrelid::pg_catalog.regclass, stxnamespace::pg_catalog.regnamespace::pg_catalog.text AS nsp, stxname, + pg_catalog.pg_get_statisticsobjdef_columns(oid) AS columns, + 'd' = any(stxkind) AS ndist_enabled, + 'f' = any(stxkind) AS deps_enabled, + 'm' = any(stxkind) AS mcv_enabled, + stxstattarget + FROM pg_catalog.pg_statistic_ext + WHERE stxrelid = '16417' + ORDER BY nsp, stxname;"#, + + r#"SELECT pubname + , NULL + , NULL + FROM pg_catalog.pg_publication p + JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid + JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid + WHERE pc.oid ='16417' and pg_catalog.pg_relation_is_publishable('16417') + UNION + SELECT pubname + , pg_get_expr(pr.prqual, c.oid) + , (CASE WHEN pr.prattrs IS NOT NULL THEN + (SELECT string_agg(attname, ', ') + FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s, + pg_catalog.pg_attribute + WHERE attrelid = pr.prrelid AND attnum = prattrs[s]) + ELSE NULL END) FROM pg_catalog.pg_publication p + JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid + JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid + WHERE pr.prrelid = '16417' + UNION + SELECT pubname + , NULL + , NULL + FROM pg_catalog.pg_publication p + WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('16417') + ORDER BY 1;"#, + + r#"SELECT c.oid::pg_catalog.regclass + FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i + WHERE c.oid = i.inhparent AND i.inhrelid = '16417' + AND c.relkind != 'p' AND c.relkind != 'I' + ORDER BY inhseqno;"#, + + r#"SELECT c.oid::pg_catalog.regclass, c.relkind, inhdetachpending, pg_catalog.pg_get_expr(c.relpartbound, c.oid) + FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i + WHERE c.oid = i.inhrelid AND i.inhparent = '16417' + ORDER BY pg_catalog.pg_get_expr(c.relpartbound, c.oid) = 'DEFAULT', c.oid::pg_catalog.regclass::pg_catalog.text;"#, + ]; #[tokio::test]