From 188cbd2164156b3190773a6c148b2ff3091ac92c Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 8 Dec 2025 17:55:17 +0800 Subject: [PATCH 1/4] refactor: drop table level permission control --- datafusion-postgres/src/hooks/permissions.rs | 31 ++++---------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/datafusion-postgres/src/hooks/permissions.rs b/datafusion-postgres/src/hooks/permissions.rs index 4d42527..ac59348 100644 --- a/datafusion-postgres/src/hooks/permissions.rs +++ b/datafusion-postgres/src/hooks/permissions.rs @@ -40,21 +40,21 @@ impl PermissionsHook { let query_trimmed = query_lower.trim(); let (required_permission, resource) = if query_trimmed.starts_with("select") { - (Permission::Select, self.extract_table_from_query(query)) + (Permission::Select, ResourceType::All) } else if query_trimmed.starts_with("insert") { - (Permission::Insert, self.extract_table_from_query(query)) + (Permission::Insert, ResourceType::All) } else if query_trimmed.starts_with("update") { - (Permission::Update, self.extract_table_from_query(query)) + (Permission::Update, ResourceType::All) } else if query_trimmed.starts_with("delete") { - (Permission::Delete, self.extract_table_from_query(query)) + (Permission::Delete, ResourceType::All) } else if query_trimmed.starts_with("create table") || query_trimmed.starts_with("create view") { (Permission::Create, ResourceType::All) } else if query_trimmed.starts_with("drop") { - (Permission::Drop, self.extract_table_from_query(query)) + (Permission::Drop, ResourceType::All) } else if query_trimmed.starts_with("alter") { - (Permission::Alter, self.extract_table_from_query(query)) + (Permission::Alter, ResourceType::All) } else { // For other queries (SHOW, EXPLAIN, etc.), allow all users return Ok(()); @@ -78,25 +78,6 @@ impl PermissionsHook { Ok(()) } - - /// Extract table name from query (simplified parsing) - fn extract_table_from_query(&self, query: &str) -> ResourceType { - let words: Vec<&str> = query.split_whitespace().collect(); - - // Simple heuristic to find table names - for (i, word) in words.iter().enumerate() { - let word_lower = word.to_lowercase(); - if (word_lower == "from" || word_lower == "into" || word_lower == "table") - && i + 1 < words.len() - { - let table_name = words[i + 1].trim_matches(|c| c == '(' || c == ')' || c == ';'); - return ResourceType::Table(table_name.to_string()); - } - } - - // If we can't determine the table, default to All - ResourceType::All - } } #[async_trait] From 743f3837a0d369109329008e5d0bf0a0bc695b09 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 8 Dec 2025 18:05:14 +0800 Subject: [PATCH 2/4] refactor!: remove permission hook and AuthManager from core path --- datafusion-postgres-cli/src/main.rs | 4 ++-- datafusion-postgres/src/handlers.rs | 19 +++++-------------- datafusion-postgres/src/lib.rs | 4 +--- datafusion-postgres/src/testing.rs | 2 +- 4 files changed, 9 insertions(+), 20 deletions(-) diff --git a/datafusion-postgres-cli/src/main.rs b/datafusion-postgres-cli/src/main.rs index 8792ce2..131e8bb 100644 --- a/datafusion-postgres-cli/src/main.rs +++ b/datafusion-postgres-cli/src/main.rs @@ -198,8 +198,8 @@ async fn main() -> Result<(), Box> { let session_config = SessionConfig::new().with_information_schema(true); let session_context = SessionContext::new_with_config(session_config); - let auth_manager = Arc::new(AuthManager::new()); + let auth_manager = Arc::new(AuthManager::new()); setup_session_context(&session_context, &opts, Arc::clone(&auth_manager)).await?; let server_options = ServerOptions::new() @@ -208,7 +208,7 @@ async fn main() -> Result<(), Box> { .with_tls_cert_path(opts.tls_cert) .with_tls_key_path(opts.tls_key); - serve(Arc::new(session_context), &server_options, auth_manager) + serve(Arc::new(session_context), &server_options) .await .map_err(|e| format!("Failed to run server: {e}"))?; diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 8fdd5e3..d18226b 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -22,9 +22,7 @@ use pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers, Type}; use pgwire::error::{PgWireError, PgWireResult}; use pgwire::types::format::FormatOptions; -use crate::auth::AuthManager; use crate::client; -use crate::hooks::permissions::PermissionsHook; use crate::hooks::set_show::SetShowHook; use crate::hooks::transactions::TransactionStatementHook; use crate::hooks::QueryHook; @@ -44,9 +42,8 @@ pub struct HandlerFactory { } impl HandlerFactory { - pub fn new(session_context: Arc, auth_manager: Arc) -> Self { - let session_service = - Arc::new(DfSessionService::new(session_context, auth_manager.clone())); + pub fn new(session_context: Arc) -> Self { + let session_service = Arc::new(DfSessionService::new(session_context)); HandlerFactory { session_service } } @@ -99,15 +96,9 @@ pub struct DfSessionService { } impl DfSessionService { - pub fn new( - session_context: Arc, - auth_manager: Arc, - ) -> DfSessionService { - let hooks: Vec> = vec![ - Arc::new(PermissionsHook::new(auth_manager)), - Arc::new(SetShowHook), - Arc::new(TransactionStatementHook), - ]; + pub fn new(session_context: Arc) -> DfSessionService { + let hooks: Vec> = + vec![Arc::new(SetShowHook), Arc::new(TransactionStatementHook)]; Self::new_with_hooks(session_context, hooks) } diff --git a/datafusion-postgres/src/lib.rs b/datafusion-postgres/src/lib.rs index 6996cd6..a83dfcb 100644 --- a/datafusion-postgres/src/lib.rs +++ b/datafusion-postgres/src/lib.rs @@ -21,7 +21,6 @@ use tokio::sync::Semaphore; use tokio_rustls::rustls::{self, ServerConfig}; use tokio_rustls::TlsAcceptor; -use crate::auth::AuthManager; use handlers::HandlerFactory; pub use handlers::{DfSessionService, Parser}; pub use hooks::QueryHook; @@ -86,10 +85,9 @@ fn setup_tls(cert_path: &str, key_path: &str) -> Result { pub async fn serve( session_context: Arc, opts: &ServerOptions, - auth_manager: Arc, ) -> Result<(), std::io::Error> { // Create the handler factory with authentication - let factory = Arc::new(HandlerFactory::new(session_context, auth_manager)); + let factory = Arc::new(HandlerFactory::new(session_context)); serve_with_handlers(factory, opts).await } diff --git a/datafusion-postgres/src/testing.rs b/datafusion-postgres/src/testing.rs index 76a13ef..98a1b69 100644 --- a/datafusion-postgres/src/testing.rs +++ b/datafusion-postgres/src/testing.rs @@ -21,7 +21,7 @@ pub fn setup_handlers() -> DfSessionService { ) .expect("Failed to setup sesession context"); - DfSessionService::new(Arc::new(session_context), Arc::new(AuthManager::new())) + DfSessionService::new(Arc::new(session_context)) } #[derive(Debug, Default)] From ff0adacab6e8c1a532782df2d040cba938d9b131 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 8 Dec 2025 18:10:43 +0800 Subject: [PATCH 3/4] chore: update some comments --- datafusion-postgres-cli/src/main.rs | 1 + datafusion-postgres/src/handlers.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-postgres-cli/src/main.rs b/datafusion-postgres-cli/src/main.rs index 131e8bb..7b0f296 100644 --- a/datafusion-postgres-cli/src/main.rs +++ b/datafusion-postgres-cli/src/main.rs @@ -199,6 +199,7 @@ async fn main() -> Result<(), Box> { let session_config = SessionConfig::new().with_information_schema(true); let session_context = SessionContext::new_with_config(session_config); + // TODO: remove or replace AuthManager for pg_catalog let auth_manager = Arc::new(AuthManager::new()); setup_session_context(&session_context, &opts, Arc::clone(&auth_manager)).await?; diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index d18226b..2f2c349 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -31,7 +31,6 @@ use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type}; use datafusion_pg_catalog::sql::PostgresCompatibilityParser; /// Simple startup handler that does no authentication -/// For production, use DfAuthSource with proper pgwire authentication handlers pub struct SimpleStartupHandler; #[async_trait::async_trait] From e547f53bbbebfd5794dd5d4a0cace0a74c16d3c4 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 8 Dec 2025 18:24:18 +0800 Subject: [PATCH 4/4] chore: remove integraion tests --- tests-integration/test.sh | 32 +------ tests-integration/test_rbac.py | 159 --------------------------------- 2 files changed, 2 insertions(+), 189 deletions(-) delete mode 100755 tests-integration/test_rbac.py diff --git a/tests-integration/test.sh b/tests-integration/test.sh index 1ec6ea4..a8caf34 100755 --- a/tests-integration/test.sh +++ b/tests-integration/test.sh @@ -20,7 +20,7 @@ wait_for_port() { local port=$1 local timeout=30 local count=0 - + # Use netstat as fallback if lsof is not available while (lsof -Pi :$port -sTCP:LISTEN -t >/dev/null 2>&1) || (netstat -ln 2>/dev/null | grep ":$port " >/dev/null 2>&1); do if [ $count -ge $timeout ]; then @@ -120,32 +120,6 @@ fi kill -9 $PARQUET_PID 2>/dev/null || true sleep 3 -# Test 4: Role-Based Access Control -echo "" -echo "šŸ” Test 4: Role-Based Access Control (RBAC)" -echo "--------------------------------------------" -wait_for_port 5435 -../target/debug/datafusion-postgres-cli -p 5435 --csv delhi:delhiclimate.csv & -RBAC_PID=$! -sleep 5 - -# Check if server is actually running -if ! ps -p $RBAC_PID > /dev/null 2>&1; then - echo "āŒ RBAC server failed to start" - exit 1 -fi - -if python3 test_rbac.py; then - echo "āœ… RBAC test passed" -else - echo "āŒ RBAC test failed" - kill -9 $RBAC_PID 2>/dev/null || true - exit 1 -fi - -kill -9 $RBAC_PID 2>/dev/null || true -sleep 3 - # Test 5: SSL/TLS Security echo "" echo "šŸ”’ Test 5: SSL/TLS Security Features" @@ -177,12 +151,10 @@ echo "==========================================" echo "" echo "šŸ“ˆ Test Summary:" echo " āœ… Enhanced CSV data loading with PostgreSQL compatibility" -echo " āœ… Complete transaction support (BEGIN/COMMIT/ROLLBACK)" +echo " āœ… Complete transaction support (BEGIN/COMMIT/ROLLBACK)" echo " āœ… Enhanced Parquet data loading with advanced data types" echo " āœ… Array types and complex data type support" echo " āœ… Improved pg_catalog system tables" echo " āœ… PostgreSQL function compatibility" -echo " āœ… Role-based access control (RBAC)" echo " āœ… SSL/TLS encryption support" echo "" -echo "šŸš€ Ready for secure production PostgreSQL workloads!" \ No newline at end of file diff --git a/tests-integration/test_rbac.py b/tests-integration/test_rbac.py deleted file mode 100755 index a2ce9bc..0000000 --- a/tests-integration/test_rbac.py +++ /dev/null @@ -1,159 +0,0 @@ -#!/usr/bin/env python3 -""" -Test Role-Based Access Control (RBAC) functionality -""" - -import psycopg -import time -import sys - -def test_rbac(): - """Test RBAC permissions and role management""" - print("šŸ” Testing Role-Based Access Control (RBAC)") - print("============================================") - - try: - # Connect as postgres (superuser) - with psycopg.connect("host=127.0.0.1 port=5435 user=postgres") as conn: - with conn.cursor() as cur: - - print("\nšŸ“‹ Test 1: Default PostgreSQL User Access") - - # Test that postgres user has full access - cur.execute("SELECT COUNT(*) FROM delhi") - count = cur.fetchone()[0] - print(f" āœ“ Postgres user SELECT access: {count} rows") - - # Test that postgres user can access system functions - try: - cur.execute("SELECT current_schema()") - schema = cur.fetchone()[0] - print(f" āœ“ Postgres user function access: current_schema = {schema}") - except Exception as e: - print(f" āš ļø Function access failed: {e}") - - print("\nšŸ” Test 2: Permission System Structure") - - # Test that the system recognizes the user - try: - cur.execute("SELECT version()") - version = cur.fetchone()[0] - print(f" āœ“ System version accessible: {version[:50]}...") - except Exception as e: - print(f" āš ļø Version query failed: {e}") - - # Test basic metadata access - try: - cur.execute("SELECT COUNT(*) FROM pg_catalog.pg_type") - type_count = cur.fetchone()[0] - print(f" āœ“ Catalog access: {type_count} types in pg_type") - except Exception as e: - print(f" āš ļø Catalog access failed: {e}") - - print("\nšŸŽÆ Test 3: Query-level Permission Checking") - - # Test different SQL operations that should work for superuser - operations = [ - ("SELECT", "SELECT COUNT(*) FROM delhi WHERE meantemp > 20"), - ("AGGREGATE", "SELECT AVG(meantemp) FROM delhi"), - ("FUNCTION", "SELECT version()"), - ] - - for op_name, query in operations: - try: - cur.execute(query) - result = cur.fetchone() - print(f" āœ“ {op_name} operation permitted: {result[0] if result else 'success'}") - except Exception as e: - print(f" āŒ {op_name} operation failed: {e}") - - print("\nšŸ“Š Test 4: Complex Query Permissions") - - # Test complex queries that involve multiple tables - complex_queries = [ - "SELECT d.date FROM delhi d LIMIT 5", - "SELECT COUNT(*) as total_records FROM delhi", - "SELECT * FROM delhi ORDER BY meantemp DESC LIMIT 3", - ] - - for i, query in enumerate(complex_queries, 1): - try: - cur.execute(query) - results = cur.fetchall() - print(f" āœ“ Complex query {i}: {len(results)} results") - except Exception as e: - print(f" āŒ Complex query {i} failed: {e}") - - print("\nšŸ” Test 5: Transaction-based Operations") - - try: - # Test transaction operations with RBAC - cur.execute("BEGIN") - cur.execute("SELECT COUNT(*) FROM delhi") - count_in_tx = cur.fetchone()[0] - cur.execute("COMMIT") - print(f" āœ“ Transaction operations: {count_in_tx} rows in transaction") - except Exception as e: - print(f" āŒ Transaction operations failed: {e}") - try: - cur.execute("ROLLBACK") - except: - pass - - print("\nšŸ—ļø Test 6: System Catalog Integration") - - # Test that RBAC doesn't interfere with system catalog queries - try: - cur.execute(""" - SELECT c.relname, c.relkind - FROM pg_catalog.pg_class c - WHERE c.relname = 'delhi' - """) - table_info = cur.fetchone() - if table_info: - print(f" āœ“ System catalog query: table '{table_info[0]}' type '{table_info[1]}'") - else: - print(" āš ļø System catalog query returned no results") - except Exception as e: - print(f" āŒ System catalog query failed: {e}") - - print("\nšŸš€ Test 7: Authentication System Validation") - - # Test that authentication manager is working - try: - # These queries should work because postgres is a superuser - validation_queries = [ - "SELECT current_schema()", - "SELECT has_table_privilege('delhi', 'SELECT')", - "SELECT version()", - ] - - for query in validation_queries: - cur.execute(query) - result = cur.fetchone()[0] - print(f" āœ“ Auth validation: {query.split('(')[0]}() = {result}") - - except Exception as e: - print(f" āš ļø Auth validation query failed: {e}") - - print("\nāœ… All RBAC tests completed!") - print("\nšŸ“ˆ RBAC Test Summary:") - print(" āœ… Default postgres superuser has full access") - print(" āœ… Permission checking system integrated") - print(" āœ… Query-level access control functional") - print(" āœ… Transaction operations work with RBAC") - print(" āœ… System catalog access preserved") - print(" āœ… Authentication system operational") - - except psycopg.Error as e: - print(f"āŒ Database connection error: {e}") - return False - except Exception as e: - print(f"āŒ Unexpected error: {e}") - return False - - return True - -if __name__ == "__main__": - success = test_rbac() - sys.exit(0 if success else 1)