Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion-postgres-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let session_config = SessionConfig::new().with_information_schema(true);
let session_context = SessionContext::new_with_config(session_config);
let auth_manager = Arc::new(AuthManager::new());

// TODO: remove or replace AuthManager for pg_catalog
let auth_manager = Arc::new(AuthManager::new());
setup_session_context(&session_context, &opts, Arc::clone(&auth_manager)).await?;

let server_options = ServerOptions::new()
Expand All @@ -208,7 +209,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_tls_cert_path(opts.tls_cert)
.with_tls_key_path(opts.tls_key);

serve(Arc::new(session_context), &server_options, auth_manager)
serve(Arc::new(session_context), &server_options)
.await
.map_err(|e| format!("Failed to run server: {e}"))?;

Expand Down
20 changes: 5 additions & 15 deletions datafusion-postgres/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers, Type};
use pgwire::error::{PgWireError, PgWireResult};
use pgwire::types::format::FormatOptions;

use crate::auth::AuthManager;
use crate::client;
use crate::hooks::permissions::PermissionsHook;
use crate::hooks::set_show::SetShowHook;
use crate::hooks::transactions::TransactionStatementHook;
use crate::hooks::QueryHook;
Expand All @@ -33,7 +31,6 @@ use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type};
use datafusion_pg_catalog::sql::PostgresCompatibilityParser;

/// Simple startup handler that does no authentication
/// For production, use DfAuthSource with proper pgwire authentication handlers
pub struct SimpleStartupHandler;

#[async_trait::async_trait]
Expand All @@ -44,9 +41,8 @@ pub struct HandlerFactory {
}

impl HandlerFactory {
pub fn new(session_context: Arc<SessionContext>, auth_manager: Arc<AuthManager>) -> Self {
let session_service =
Arc::new(DfSessionService::new(session_context, auth_manager.clone()));
pub fn new(session_context: Arc<SessionContext>) -> Self {
let session_service = Arc::new(DfSessionService::new(session_context));
HandlerFactory { session_service }
}

Expand Down Expand Up @@ -99,15 +95,9 @@ pub struct DfSessionService {
}

impl DfSessionService {
pub fn new(
session_context: Arc<SessionContext>,
auth_manager: Arc<AuthManager>,
) -> DfSessionService {
let hooks: Vec<Arc<dyn QueryHook>> = vec![
Arc::new(PermissionsHook::new(auth_manager)),
Arc::new(SetShowHook),
Arc::new(TransactionStatementHook),
];
pub fn new(session_context: Arc<SessionContext>) -> DfSessionService {
let hooks: Vec<Arc<dyn QueryHook>> =
vec![Arc::new(SetShowHook), Arc::new(TransactionStatementHook)];
Self::new_with_hooks(session_context, hooks)
}

Expand Down
31 changes: 6 additions & 25 deletions datafusion-postgres/src/hooks/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ impl PermissionsHook {
let query_trimmed = query_lower.trim();

let (required_permission, resource) = if query_trimmed.starts_with("select") {
(Permission::Select, self.extract_table_from_query(query))
(Permission::Select, ResourceType::All)
} else if query_trimmed.starts_with("insert") {
(Permission::Insert, self.extract_table_from_query(query))
(Permission::Insert, ResourceType::All)
} else if query_trimmed.starts_with("update") {
(Permission::Update, self.extract_table_from_query(query))
(Permission::Update, ResourceType::All)
} else if query_trimmed.starts_with("delete") {
(Permission::Delete, self.extract_table_from_query(query))
(Permission::Delete, ResourceType::All)
} else if query_trimmed.starts_with("create table")
|| query_trimmed.starts_with("create view")
{
(Permission::Create, ResourceType::All)
} else if query_trimmed.starts_with("drop") {
(Permission::Drop, self.extract_table_from_query(query))
(Permission::Drop, ResourceType::All)
} else if query_trimmed.starts_with("alter") {
(Permission::Alter, self.extract_table_from_query(query))
(Permission::Alter, ResourceType::All)
} else {
// For other queries (SHOW, EXPLAIN, etc.), allow all users
return Ok(());
Expand All @@ -78,25 +78,6 @@ impl PermissionsHook {

Ok(())
}

/// Extract table name from query (simplified parsing)
fn extract_table_from_query(&self, query: &str) -> ResourceType {
let words: Vec<&str> = query.split_whitespace().collect();

// Simple heuristic to find table names
for (i, word) in words.iter().enumerate() {
let word_lower = word.to_lowercase();
if (word_lower == "from" || word_lower == "into" || word_lower == "table")
&& i + 1 < words.len()
{
let table_name = words[i + 1].trim_matches(|c| c == '(' || c == ')' || c == ';');
return ResourceType::Table(table_name.to_string());
}
}

// If we can't determine the table, default to All
ResourceType::All
}
}

#[async_trait]
Expand Down
4 changes: 1 addition & 3 deletions datafusion-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tokio::sync::Semaphore;
use tokio_rustls::rustls::{self, ServerConfig};
use tokio_rustls::TlsAcceptor;

use crate::auth::AuthManager;
use handlers::HandlerFactory;
pub use handlers::{DfSessionService, Parser};
pub use hooks::QueryHook;
Expand Down Expand Up @@ -86,10 +85,9 @@ fn setup_tls(cert_path: &str, key_path: &str) -> Result<TlsAcceptor, IOError> {
pub async fn serve(
session_context: Arc<SessionContext>,
opts: &ServerOptions,
auth_manager: Arc<AuthManager>,
) -> Result<(), std::io::Error> {
// Create the handler factory with authentication
let factory = Arc::new(HandlerFactory::new(session_context, auth_manager));
let factory = Arc::new(HandlerFactory::new(session_context));

serve_with_handlers(factory, opts).await
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion-postgres/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn setup_handlers() -> DfSessionService {
)
.expect("Failed to setup sesession context");

DfSessionService::new(Arc::new(session_context), Arc::new(AuthManager::new()))
DfSessionService::new(Arc::new(session_context))
}

#[derive(Debug, Default)]
Expand Down
32 changes: 2 additions & 30 deletions tests-integration/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ wait_for_port() {
local port=$1
local timeout=30
local count=0

# Use netstat as fallback if lsof is not available
while (lsof -Pi :$port -sTCP:LISTEN -t >/dev/null 2>&1) || (netstat -ln 2>/dev/null | grep ":$port " >/dev/null 2>&1); do
if [ $count -ge $timeout ]; then
Expand Down Expand Up @@ -120,32 +120,6 @@ fi
kill -9 $PARQUET_PID 2>/dev/null || true
sleep 3

# Test 4: Role-Based Access Control
echo ""
echo "🔐 Test 4: Role-Based Access Control (RBAC)"
echo "--------------------------------------------"
wait_for_port 5435
../target/debug/datafusion-postgres-cli -p 5435 --csv delhi:delhiclimate.csv &
RBAC_PID=$!
sleep 5

# Check if server is actually running
if ! ps -p $RBAC_PID > /dev/null 2>&1; then
echo "❌ RBAC server failed to start"
exit 1
fi

if python3 test_rbac.py; then
echo "✅ RBAC test passed"
else
echo "❌ RBAC test failed"
kill -9 $RBAC_PID 2>/dev/null || true
exit 1
fi

kill -9 $RBAC_PID 2>/dev/null || true
sleep 3

# Test 5: SSL/TLS Security
echo ""
echo "🔒 Test 5: SSL/TLS Security Features"
Expand Down Expand Up @@ -177,12 +151,10 @@ echo "=========================================="
echo ""
echo "📈 Test Summary:"
echo " ✅ Enhanced CSV data loading with PostgreSQL compatibility"
echo " ✅ Complete transaction support (BEGIN/COMMIT/ROLLBACK)"
echo " ✅ Complete transaction support (BEGIN/COMMIT/ROLLBACK)"
echo " ✅ Enhanced Parquet data loading with advanced data types"
echo " ✅ Array types and complex data type support"
echo " ✅ Improved pg_catalog system tables"
echo " ✅ PostgreSQL function compatibility"
echo " ✅ Role-based access control (RBAC)"
echo " ✅ SSL/TLS encryption support"
echo ""
echo "🚀 Ready for secure production PostgreSQL workloads!"
159 changes: 0 additions & 159 deletions tests-integration/test_rbac.py

This file was deleted.

Loading