Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 146 additions & 3 deletions src/metadata_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,37 @@
//! This module provides the `MetadataWriter` trait for writing metadata to DuckLake catalogs,
//! along with helper types for column definitions and data file registration.

use crate::Result;
use crate::{DuckLakeError, Result};

/// Maximum allowed length for catalog entity names (schemas, tables, columns).
pub const MAX_NAME_LENGTH: usize = 1024;

/// Validate a catalog entity name (schema, table, or column).
///
/// Rejects names that are:
/// - Empty or whitespace-only
/// - Contain ASCII control characters (0x00-0x1F, 0x7F)
/// - Exceed [`MAX_NAME_LENGTH`] characters
pub fn validate_name(name: &str, kind: &str) -> Result<()> {
if name.trim().is_empty() {
return Err(DuckLakeError::InvalidConfig(format!(
"{kind} name cannot be empty or whitespace-only"
)));
}
if let Some(pos) = name.find(|c: char| c.is_ascii_control()) {
let byte = name.as_bytes()[pos];
return Err(DuckLakeError::InvalidConfig(format!(
"{kind} name contains control character 0x{byte:02X} at position {pos}"
)));
}
if name.len() > MAX_NAME_LENGTH {
return Err(DuckLakeError::InvalidConfig(format!(
"{kind} name exceeds maximum length of {MAX_NAME_LENGTH} characters (got {})",
name.len()
)));
}
Ok(())
}

/// Write mode for table operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -55,12 +85,14 @@ impl ColumnDef {
ducklake_type: impl Into<String>,
is_nullable: bool,
) -> Result<Self> {
let name = name.into();
validate_name(&name, "Column")?;
let ducklake_type = ducklake_type.into();
// Validate the type string by attempting to convert it to an Arrow type.
// We discard the result; we only care that the conversion succeeds.
ducklake_to_arrow_type(&ducklake_type)?;
Ok(Self {
name: name.into(),
name,
ducklake_type,
is_nullable,
})
Expand All @@ -76,11 +108,13 @@ impl ColumnDef {
data_type: &DataType,
is_nullable: bool,
) -> Result<Self> {
let name = name.into();
validate_name(&name, "Column")?;
let ducklake_type = arrow_to_ducklake_type(data_type)?;
// We use direct struct construction here since the ducklake_type was just
// produced by arrow_to_ducklake_type, so it is guaranteed to be valid.
Ok(Self {
name: name.into(),
name,
ducklake_type,
is_nullable,
})
Expand Down Expand Up @@ -300,4 +334,113 @@ mod tests {
let file = DataFileInfo::new("/absolute/path.parquet", 1024, 100).with_absolute_path();
assert!(!file.path_is_relative);
}

#[test]
fn test_column_def_empty_name_rejected() {
let result = ColumnDef::new("", "int32", true);
assert!(result.is_err());
match result {
Err(DuckLakeError::InvalidConfig(msg)) => {
assert!(msg.contains("empty"), "Expected 'empty' in: {msg}");
},
other => panic!("Expected InvalidConfig, got {:?}", other),
}
}

#[test]
fn test_column_def_control_char_name_rejected() {
let result = ColumnDef::new("col\0name", "int32", true);
assert!(result.is_err());
match result {
Err(DuckLakeError::InvalidConfig(msg)) => {
assert!(
msg.contains("control character"),
"Expected 'control character' in: {msg}"
);
},
other => panic!("Expected InvalidConfig, got {:?}", other),
}
}

#[test]
fn test_column_def_from_arrow_empty_name_rejected() {
let result = ColumnDef::from_arrow("", &DataType::Int64, false);
assert!(result.is_err());
match result {
Err(DuckLakeError::InvalidConfig(msg)) => {
assert!(msg.contains("empty"), "Expected 'empty' in: {msg}");
},
other => panic!("Expected InvalidConfig, got {:?}", other),
}
}

#[test]
fn test_column_def_from_arrow_control_char_rejected() {
let result = ColumnDef::from_arrow("col\nnewline", &DataType::Int64, false);
assert!(result.is_err());
match result {
Err(DuckLakeError::InvalidConfig(msg)) => {
assert!(
msg.contains("control character"),
"Expected 'control character' in: {msg}"
);
},
other => panic!("Expected InvalidConfig, got {:?}", other),
}
}

#[test]
fn test_validate_name_valid() {
assert!(validate_name("users", "Table").is_ok());
assert!(validate_name("my_column", "Column").is_ok());
assert!(validate_name("Schema123", "Schema").is_ok());
assert!(validate_name("a", "Column").is_ok());
}

#[test]
fn test_validate_name_empty() {
let result = validate_name("", "Table");
assert!(result.is_err());
let result = validate_name(" ", "Table");
assert!(result.is_err());
}

#[test]
fn test_validate_name_control_chars() {
// Null byte
assert!(validate_name("col\0", "Column").is_err());
// Newline
assert!(validate_name("col\n", "Column").is_err());
// Tab
assert!(validate_name("col\t", "Column").is_err());
// DEL (0x7F)
assert!(validate_name("col\x7F", "Column").is_err());
}

#[test]
fn test_validate_name_length_limit() {
// Exactly at limit should succeed
let at_limit = "a".repeat(MAX_NAME_LENGTH);
assert!(validate_name(&at_limit, "Table").is_ok());

// One over should fail
let over_limit = "a".repeat(MAX_NAME_LENGTH + 1);
assert!(validate_name(&over_limit, "Table").is_err());
}

#[test]
fn test_column_def_long_name_rejected() {
let long_name = "x".repeat(MAX_NAME_LENGTH + 1);
let result = ColumnDef::new(long_name, "int32", true);
assert!(result.is_err());
match result {
Err(DuckLakeError::InvalidConfig(msg)) => {
assert!(
msg.contains("exceeds maximum length"),
"Expected 'exceeds maximum length' in: {msg}"
);
},
other => panic!("Expected InvalidConfig, got {:?}", other),
}
}
}
65 changes: 64 additions & 1 deletion src/metadata_writer_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::Result;
use crate::metadata_provider::block_on;
use crate::metadata_writer::{
ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteSetupResult,
ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteSetupResult, validate_name,
};
use sqlx::Row;
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
Expand Down Expand Up @@ -133,6 +133,7 @@ impl MetadataWriter for SqliteMetadataWriter {
path: Option<&str>,
snapshot_id: i64,
) -> Result<(i64, bool)> {
validate_name(name, "Schema")?;
block_on(async {
let existing = sqlx::query(
"SELECT schema_id FROM ducklake_schema
Expand Down Expand Up @@ -168,6 +169,7 @@ impl MetadataWriter for SqliteMetadataWriter {
path: Option<&str>,
snapshot_id: i64,
) -> Result<(i64, bool)> {
validate_name(name, "Table")?;
block_on(async {
let existing = sqlx::query(
"SELECT table_id FROM ducklake_table
Expand Down Expand Up @@ -333,6 +335,8 @@ impl MetadataWriter for SqliteMetadataWriter {
columns: &[ColumnDef],
mode: WriteMode,
) -> Result<WriteSetupResult> {
validate_name(schema_name, "Schema")?;
validate_name(table_name, "Table")?;
if columns.is_empty() {
return Err(crate::DuckLakeError::InvalidConfig(
"Table must have at least one column".to_string(),
Expand Down Expand Up @@ -660,4 +664,63 @@ mod tests {
let path2 = writer.get_data_path().unwrap();
assert_eq!(path2, "/new/path");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_get_or_create_schema_empty_name_rejected() {
let (writer, _temp) = create_test_writer().await;
let snapshot_id = writer.create_snapshot().unwrap();
let result = writer.get_or_create_schema("", None, snapshot_id);
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("empty"), "Expected 'empty' in: {err_msg}");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_get_or_create_schema_control_char_rejected() {
let (writer, _temp) = create_test_writer().await;
let snapshot_id = writer.create_snapshot().unwrap();
let result = writer.get_or_create_schema("bad\0schema", None, snapshot_id);
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("control character"),
"Expected 'control character' in: {err_msg}"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_begin_write_transaction_empty_schema_name_rejected() {
let (writer, _temp) = create_test_writer().await;
let columns = vec![ColumnDef::new("id", "int64", false).unwrap()];
let result = writer.begin_write_transaction("", "table", &columns, WriteMode::Replace);
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("empty"), "Expected 'empty' in: {err_msg}");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_begin_write_transaction_empty_table_name_rejected() {
let (writer, _temp) = create_test_writer().await;
let columns = vec![ColumnDef::new("id", "int64", false).unwrap()];
let result = writer.begin_write_transaction("main", "", &columns, WriteMode::Replace);
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("empty"), "Expected 'empty' in: {err_msg}");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_begin_write_transaction_control_char_names_rejected() {
let (writer, _temp) = create_test_writer().await;
let columns = vec![ColumnDef::new("id", "int64", false).unwrap()];

// Control char in schema name
let result =
writer.begin_write_transaction("bad\nschema", "table", &columns, WriteMode::Replace);
assert!(result.is_err());

// Control char in table name
let result =
writer.begin_write_transaction("main", "bad\ttable", &columns, WriteMode::Replace);
assert!(result.is_err());
}
}
21 changes: 14 additions & 7 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@ use crate::path_resolver::resolve_path;
use crate::table::DuckLakeTable;

#[cfg(feature = "write")]
use crate::metadata_writer::{ColumnDef, MetadataWriter, WriteMode};
use crate::metadata_writer::{ColumnDef, MetadataWriter, WriteMode, validate_name};
#[cfg(feature = "write")]
use datafusion::error::DataFusionError;

/// Validate table name to prevent path traversal attacks.
/// Validate table name to prevent path traversal attacks and reject
/// empty, control-character, or overlength names.
///
/// Table names are used to construct file paths, so we must ensure they
/// don't contain path separators or parent directory references.
#[cfg(feature = "write")]
fn validate_table_name(name: &str) -> DataFusionResult<()> {
if name.is_empty() {
return Err(DataFusionError::Plan(
"Table name cannot be empty".to_string(),
));
}
// Shared name validation (empty, control chars, length)
validate_name(name, "Table").map_err(|e| DataFusionError::External(Box::new(e)))?;
if name.contains('/') || name.contains('\\') || name.contains("..") {
return Err(DataFusionError::Plan(format!(
"Invalid table name '{}': must not contain path separators or '..'",
Expand Down Expand Up @@ -270,4 +269,12 @@ mod tests {
assert!(validate_table_name("..").is_err());
assert!(validate_table_name("...").is_err());
}

#[test]
fn test_validate_table_name_control_chars() {
assert!(validate_table_name("table\0name").is_err());
assert!(validate_table_name("table\nname").is_err());
assert!(validate_table_name("table\tname").is_err());
assert!(validate_table_name("\x7Ftable").is_err());
}
}
Loading