Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions datafusion-pg-catalog/src/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use datafusion::catalog::{MemTable, SchemaProvider, TableFunctionImpl};
use datafusion::common::utils::SingleRowListArrayBuilder;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
use datafusion::logical_expr::{
ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion::prelude::{create_udf, Expr, SessionContext};
use postgres_types::Oid;
Expand Down Expand Up @@ -1348,22 +1350,75 @@ pub fn create_pg_stat_get_numscans() -> ScalarUDF {
}

pub fn create_pg_get_constraintdef() -> ScalarUDF {
#[derive(Debug, PartialEq, Eq, Hash)]
struct GetConstraintDefUDF {
signature: Signature,
}

impl GetConstraintDefUDF {
fn new() -> Self {
let type_signature = TypeSignature::OneOf(vec![
TypeSignature::Exact(vec![DataType::Int32]),
TypeSignature::Exact(vec![DataType::Int32, DataType::Boolean]),
]);

let signature = Signature::new(type_signature, Volatility::Stable);
GetConstraintDefUDF { signature }
}
}

impl ScalarUDFImpl for GetConstraintDefUDF {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"pg_get_constraintdef"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke_with_args(
&self,
args: datafusion::logical_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(&args.args)?;
let oids = &args[0].as_primitive::<Int32Type>();

let mut builder = StringBuilder::new();
for _ in 0..oids.len() {
builder.append_value("");
}

let array: ArrayRef = Arc::new(builder.finish());
Ok(ColumnarValue::Array(array))
}
}

GetConstraintDefUDF::new().into()
}

pub fn create_pg_get_partition_ancestors_udf() -> ScalarUDF {
let func = move |args: &[ColumnarValue]| {
let args = ColumnarValue::values_to_arrays(args)?;
let oids = &args[0].as_primitive::<Int32Type>();
let string_array = args[0].as_string::<i32>();

let mut builder = StringBuilder::new();
for _ in 0..oids.len() {
builder.append_value("");
}

string_array.iter().for_each(|i| builder.append_option(i));
let array: ArrayRef = Arc::new(builder.finish());

Ok(ColumnarValue::Array(array))
};

create_udf(
"pg_get_constraintdef",
vec![DataType::Int32],
"pg_partition_ancestors",
vec![DataType::Utf8],
DataType::Utf8,
Volatility::Stable,
Arc::new(func),
Expand Down Expand Up @@ -1425,6 +1480,7 @@ where
session_context.register_udf(create_pg_total_relation_size_udf());
session_context.register_udf(create_pg_stat_get_numscans());
session_context.register_udf(create_pg_get_constraintdef());
session_context.register_udf(create_pg_get_partition_ancestors_udf());

Ok(())
}
Expand Down
104 changes: 104 additions & 0 deletions datafusion-postgres/tests/psql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,110 @@ const PSQL_QUERIES: &[&str] = &[
CASE WHEN pg_catalog.array_length(d.datacl, 1) = 0 THEN '(none)' ELSE pg_catalog.array_to_string(d.datacl, E'\n') END AS "Access privileges"
FROM pg_catalog.pg_database d
ORDER BY 1;"#,

// Queries from describing a table, for example `\d customer`

r#"SELECT c.oid,
n.nspname,
c.relname
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname OPERATOR(pg_catalog.~) '^(customer)$' COLLATE pg_catalog.default
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 2, 3;"#,

r#"SELECT a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod),
(SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid, true)
FROM pg_catalog.pg_attrdef d
WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef),
a.attnotnull,
(SELECT c.collname FROM pg_catalog.pg_collation c, pg_catalog.pg_type t
WHERE c.oid = a.attcollation AND t.oid = a.atttypid AND a.attcollation <> t.typcollation) AS attcollation,
a.attidentity,
a.attgenerated
FROM pg_catalog.pg_attribute a
WHERE a.attrelid = '16417' AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum;"#,


r#"SELECT true as sametable, conname,
pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
conrelid::pg_catalog.regclass AS ontable
FROM pg_catalog.pg_constraint r
WHERE r.conrelid = '16417' AND r.contype = 'f'
AND conparentid = 0
ORDER BY conname;"#,

r#"SELECT conname, conrelid::pg_catalog.regclass AS ontable,
pg_catalog.pg_get_constraintdef(oid, true) AS condef
FROM pg_catalog.pg_constraint c
WHERE confrelid IN (SELECT pg_catalog.pg_partition_ancestors('16417')
UNION ALL VALUES ('16417'::pg_catalog.regclass))
AND contype = 'f' AND conparentid = 0
ORDER BY conname;"#,

r#"SELECT pol.polname, pol.polpermissive,
CASE WHEN pol.polroles = '{0}' THEN NULL ELSE pg_catalog.array_to_string(array(select rolname from pg_catalog.pg_roles where oid = any (pol.polroles) order by 1),',') END,
pg_catalog.pg_get_expr(pol.polqual, pol.polrelid),
pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid),
CASE pol.polcmd
WHEN 'r' THEN 'SELECT'
WHEN 'a' THEN 'INSERT'
WHEN 'w' THEN 'UPDATE'
WHEN 'd' THEN 'DELETE'
END AS cmd
FROM pg_catalog.pg_policy pol
WHERE pol.polrelid = '16417' ORDER BY 1;"#,

r#"SELECT oid, stxrelid::pg_catalog.regclass, stxnamespace::pg_catalog.regnamespace::pg_catalog.text AS nsp, stxname,
pg_catalog.pg_get_statisticsobjdef_columns(oid) AS columns,
'd' = any(stxkind) AS ndist_enabled,
'f' = any(stxkind) AS deps_enabled,
'm' = any(stxkind) AS mcv_enabled,
stxstattarget
FROM pg_catalog.pg_statistic_ext
WHERE stxrelid = '16417'
ORDER BY nsp, stxname;"#,

r#"SELECT pubname
, NULL
, NULL
FROM pg_catalog.pg_publication p
JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid
JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid
WHERE pc.oid ='16417' and pg_catalog.pg_relation_is_publishable('16417')
UNION
SELECT pubname
, pg_get_expr(pr.prqual, c.oid)
, (CASE WHEN pr.prattrs IS NOT NULL THEN
(SELECT string_agg(attname, ', ')
FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,
pg_catalog.pg_attribute
WHERE attrelid = pr.prrelid AND attnum = prattrs[s])
ELSE NULL END) FROM pg_catalog.pg_publication p
JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid
JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid
WHERE pr.prrelid = '16417'
UNION
SELECT pubname
, NULL
, NULL
FROM pg_catalog.pg_publication p
WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('16417')
ORDER BY 1;"#,

r#"SELECT c.oid::pg_catalog.regclass
FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i
WHERE c.oid = i.inhparent AND i.inhrelid = '16417'
AND c.relkind != 'p' AND c.relkind != 'I'
ORDER BY inhseqno;"#,

r#"SELECT c.oid::pg_catalog.regclass, c.relkind, inhdetachpending, pg_catalog.pg_get_expr(c.relpartbound, c.oid)
FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i
WHERE c.oid = i.inhrelid AND i.inhparent = '16417'
ORDER BY pg_catalog.pg_get_expr(c.relpartbound, c.oid) = 'DEFAULT', c.oid::pg_catalog.regclass::pg_catalog.text;"#,

];

#[tokio::test]
Expand Down
Loading