diff --git a/Cargo.lock b/Cargo.lock index 0b42141..df15bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,9 +295,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.43" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", "js-sys", @@ -1506,15 +1506,14 @@ dependencies = [ [[package]] name = "pg_walstream" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6af2b7580eba9cde84a0a7e539fd5eeac7f1c726ab7d9ed5181b49f4bc3f38a7" +checksum = "c6227c83c13542bbfa8936e5a9f8f48c94c256cc6304b67decac921c4e645fab" dependencies = [ "bytes", "chrono", "libpq-sys", "serde", - "serde_json", "thiserror 2.0.18", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 3352a78..bd1f937 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ thiserror = "2.0.12" lazy_static = "1.5" tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } -pg_walstream = "0.4.1" +pg_walstream = "0.5.1" tiberius = { version = "0.12.3", features = ["tds73", "sql-browser-tokio", "bigdecimal", "rust_decimal", "time", "chrono"] } sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "mysql", "sqlite", "chrono", "uuid"] } flate2 = "1.1.5" diff --git a/pg2any-lib/src/client.rs b/pg2any-lib/src/client.rs index c6d0f52..dd280a1 100644 --- a/pg2any-lib/src/client.rs +++ b/pg2any-lib/src/client.rs @@ -1236,15 +1236,6 @@ impl CdcClient { .shared_lsn_feedback .log_state("Final shutdown - LSN state before ACK"); - if let Err(e) = stream.send_feedback() { - warn!("Failed to send final feedback: {}", e); - } - - info!( - "Stopping logical replication stream (last received LSN: {})", - pg_walstream::format_lsn(stream.current_lsn()) - ); - if let Err(e) = stream.stop().await { error!("Failed to stop replication stream: {}", e); return Err(CdcError::from(e)); diff --git a/pg2any-lib/src/lib.rs b/pg2any-lib/src/lib.rs index eb72a21..0ace78d 100644 --- a/pg2any-lib/src/lib.rs +++ b/pg2any-lib/src/lib.rs @@ -92,7 +92,6 @@ pub mod destinations; pub use pg_walstream::{ // Type aliases and utilities format_lsn, - format_postgres_timestamp, // Protocol types message_types, parse_lsn, diff --git a/pg2any-lib/src/transaction_manager.rs b/pg2any-lib/src/transaction_manager.rs index ebf1b30..ff6e726 100644 --- a/pg2any-lib/src/transaction_manager.rs +++ b/pg2any-lib/src/transaction_manager.rs @@ -44,6 +44,7 @@ use crate::storage::{CompressionIndex, SqlStreamParser, StorageFactory, Transact use crate::types::{ChangeEvent, DestinationType, EventType, Lsn, ReplicaIdentity, RowData}; use async_compression::tokio::bufread::GzipDecoder; use chrono::{DateTime, Utc}; +use pg_walstream::ColumnValue; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -1046,16 +1047,13 @@ impl TransactionManager { /// Generate INSERT SQL command /// - /// Accepts `&RowData` directly. Converts to HashMap internally because - /// column iteration is required (RowData does not yet expose `iter()`). + /// Accepts `&RowData` directly. Uses `iter()` for column iteration. fn generate_insert_sql(&self, schema: &str, table: &str, new_data: &RowData) -> Result { let schema = self.map_schema(Some(schema)); let (columns, values): (Vec, Vec) = new_data - .clone() - .into_hash_map() - .into_iter() - .map(|(k, v)| (k, self.format_value(&v))) + .iter() + .map(|(k, v)| (k.to_string(), self.format_value(v))) .unzip(); let sql = match self.destination_type { @@ -1114,9 +1112,7 @@ impl TransactionManager { ) -> Result { let schema = self.map_schema(Some(schema)); - // Convert to HashMap for SET clause iteration (RowData lacks iter()) - let new_data_map = new_data.clone().into_hash_map(); - let set_clause: Vec = new_data_map + let set_clause: Vec = new_data .iter() .map(|(col, val)| { let formatted_col = match self.destination_type { @@ -1259,13 +1255,11 @@ impl TransactionManager { .collect::>>()? } ReplicaIdentity::Full => { - // Use all columns from old data — must convert to HashMap for iteration + // Use all columns from old data let data = old_data.ok_or_else(|| { CdcError::Generic("FULL replica identity requires old data".to_string()) })?; - let data_map = data.clone().into_hash_map(); - data_map - .iter() + data.iter() .map(|(col, val)| { let formatted_col = match self.destination_type { DestinationType::MySQL => format!("`{col}`"), @@ -1290,27 +1284,27 @@ impl TransactionManager { Ok(conditions.join(" AND ")) } - /// Format a JSON value as SQL literal - fn format_value(&self, value: &serde_json::Value) -> String { + /// Format a `ColumnValue` as a SQL literal + fn format_value(&self, value: &ColumnValue) -> String { match value { - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Bool(b) => { - if *b { - "TRUE".to_string() - } else { - "FALSE".to_string() + ColumnValue::Null => "NULL".to_string(), + ColumnValue::Text(b) => { + match std::str::from_utf8(b) { + Ok(s) => { + // Escape single quotes by doubling them (SQL standard) + let escaped = s.replace('\'', "''"); + format!("'{escaped}'") + } + Err(_) => { + // Non-UTF-8 text: hex-encode as binary literal + let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect(); + format!("X'{hex}'") + } } } - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => { - // Escape single quotes by doubling them (SQL standard) - let escaped = s.replace('\'', "''"); - format!("'{escaped}'") - } - serde_json::Value::Array(_) | serde_json::Value::Object(_) => { - // For complex types, serialize as JSON string - let json_str = value.to_string().replace('\'', "''"); - format!("'{json_str}'") + ColumnValue::Binary(b) => { + let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect(); + format!("X'{hex}'") } } } diff --git a/pg2any-lib/tests/destination_integration_tests.rs b/pg2any-lib/tests/destination_integration_tests.rs index 02baae6..b20749a 100644 --- a/pg2any-lib/tests/destination_integration_tests.rs +++ b/pg2any-lib/tests/destination_integration_tests.rs @@ -3,7 +3,7 @@ use pg2any_lib::{ types::{ChangeEvent, EventType, ReplicaIdentity}, DestinationType, }; -use pg_walstream::{Lsn, RowData}; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; /// Test that destination handlers have consistent interfaces @@ -103,9 +103,9 @@ fn test_unsupported_destination_types() { // Helper functions to create test events fn create_test_event() -> ChangeEvent { let data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("test".to_string())), - ("active", serde_json::Value::Bool(true)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), + ("active", ColumnValue::text("t")), ]); ChangeEvent::insert("public", "test_table", 456, data, Lsn::from(100)) @@ -113,13 +113,13 @@ fn create_test_event() -> ChangeEvent { fn create_update_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("old_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("old_name")), ]); let new_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("new_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), ]); ChangeEvent::update( @@ -136,11 +136,8 @@ fn create_update_event() -> ChangeEvent { fn create_delete_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ( - "name", - serde_json::Value::String("deleted_name".to_string()), - ), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("deleted_name")), ]); ChangeEvent::delete( @@ -156,8 +153,8 @@ fn create_delete_event() -> ChangeEvent { fn create_update_event_without_old_data() -> ChangeEvent { let new_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("new_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), ]); ChangeEvent::update( @@ -198,18 +195,12 @@ fn test_mysql_destination_update_with_old_data() { // Verify that old_data contains key information for WHERE clause assert!(old_data.get("id").is_some()); - assert_eq!( - old_data.get("id").unwrap(), - &serde_json::Value::Number(serde_json::Number::from(1)) - ); + assert_eq!(old_data.get("id").unwrap(), "1"); // Verify that new_data contains updated information assert!(new_data.get("id").is_some()); assert!(new_data.get("name").is_some()); - assert_eq!( - new_data.get("name").unwrap(), - &serde_json::Value::String("new_name".to_string()) - ); + assert_eq!(new_data.get("name").unwrap(), "new_name"); } else { panic!("Expected Update event"); } @@ -254,18 +245,12 @@ fn test_sqlserver_destination_update_with_old_data() { // Verify that old_data contains key information for WHERE clause assert!(old_data.get("id").is_some()); - assert_eq!( - old_data.get("id").unwrap(), - &serde_json::Value::Number(serde_json::Number::from(1)) - ); + assert_eq!(old_data.get("id").unwrap(), "1"); // Verify that new_data contains updated information assert!(new_data.get("id").is_some()); assert!(new_data.get("name").is_some()); - assert_eq!( - new_data.get("name").unwrap(), - &serde_json::Value::String("new_name".to_string()) - ); + assert_eq!(new_data.get("name").unwrap(), "new_name"); } _ => panic!("Expected Update event"), } @@ -281,10 +266,7 @@ fn test_delete_event_uses_old_data() { EventType::Delete { old_data, .. } => { assert!(old_data.get("id").is_some()); assert!(old_data.get("name").is_some()); - assert_eq!( - old_data.get("name").unwrap(), - &serde_json::Value::String("deleted_name".to_string()) - ); + assert_eq!(old_data.get("name").unwrap(), "deleted_name"); } _ => panic!("Expected Delete event"), } diff --git a/pg2any-lib/tests/event_type_refactor_tests.rs b/pg2any-lib/tests/event_type_refactor_tests.rs index 001cec9..b8fbeba 100644 --- a/pg2any-lib/tests/event_type_refactor_tests.rs +++ b/pg2any-lib/tests/event_type_refactor_tests.rs @@ -1,12 +1,14 @@ use chrono::Utc; use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::json; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; #[test] fn test_new_event_type_insert() { - let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), + ]); let event = ChangeEvent::insert("public", "users", 123, data.clone(), Lsn::from(100)); @@ -28,9 +30,15 @@ fn test_new_event_type_insert() { #[test] fn test_new_event_type_update() { - let old_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("old_name"))]); + let old_data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("old_name")), + ]); - let new_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("new_name"))]); + let new_data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), + ]); let event = ChangeEvent::update( "public", @@ -68,7 +76,10 @@ fn test_new_event_type_update() { #[test] fn test_new_event_type_delete() { - let old_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("deleted_name"))]); + let old_data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("deleted_name")), + ]); let event = ChangeEvent::delete( "public", @@ -145,7 +156,10 @@ fn test_new_event_type_truncate() { #[test] fn test_event_serialization() { - let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), + ]); let event = ChangeEvent::insert("public", "users", 123, data, Lsn::from(100)); diff --git a/pg2any-lib/tests/metrics_logical_tests.rs b/pg2any-lib/tests/metrics_logical_tests.rs index fea5c5a..85095d4 100644 --- a/pg2any-lib/tests/metrics_logical_tests.rs +++ b/pg2any-lib/tests/metrics_logical_tests.rs @@ -10,6 +10,7 @@ use pg2any_lib::monitoring::{ ProcessingTimerTrait, }; use pg2any_lib::types::{ChangeEvent, ReplicaIdentity}; +use pg_walstream::ColumnValue; use pg_walstream::Lsn; use pg_walstream::RowData; use std::sync::Arc; @@ -18,8 +19,8 @@ use std::time::Duration; /// Helper function to create test change events fn create_test_insert_event() -> ChangeEvent { let data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("test".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), ]); ChangeEvent::insert("public", "users", 12345, data, Lsn::from(100)) @@ -27,13 +28,13 @@ fn create_test_insert_event() -> ChangeEvent { fn create_test_update_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("old_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("old_name")), ]); let new_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("new_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), ]); ChangeEvent::update( @@ -50,11 +51,8 @@ fn create_test_update_event() -> ChangeEvent { fn create_test_delete_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ( - "name", - serde_json::Value::String("deleted_name".to_string()), - ), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("deleted_name")), ]); ChangeEvent::delete( diff --git a/pg2any-lib/tests/mysql_edge_cases_tests.rs b/pg2any-lib/tests/mysql_edge_cases_tests.rs index a9cef7c..920a839 100644 --- a/pg2any-lib/tests/mysql_edge_cases_tests.rs +++ b/pg2any-lib/tests/mysql_edge_cases_tests.rs @@ -1,8 +1,7 @@ /// Integration tests for MySQL destination error handling and edge cases /// These tests verify error conditions and edge cases in the WHERE clause generation use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::Value; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; /// Test scenarios that should result in errors when processed by MySQL destination @@ -13,8 +12,7 @@ mod mysql_error_scenarios { #[test] fn test_missing_key_column_scenario() { // Create event where key column is missing from data - let incomplete_data = - RowData::from_pairs(vec![("name", Value::String("test".to_string()))]); + let incomplete_data = RowData::from_pairs(vec![("name", ColumnValue::text("test"))]); // Missing "id" which is specified as key column let event = ChangeEvent::update( @@ -100,7 +98,7 @@ mod mysql_error_scenarios { #[test] fn test_empty_key_columns_scenario() { // Test scenario with empty key columns (NOTHING replica identity) - let data = RowData::from_pairs(vec![("some_data", Value::String("value".to_string()))]); + let data = RowData::from_pairs(vec![("some_data", ColumnValue::text("value"))]); let event = ChangeEvent::update( "public", @@ -124,7 +122,7 @@ mod mysql_error_scenarios { fn test_partial_key_columns_missing() { // Test composite key where some key columns are missing let incomplete_data = - RowData::from_pairs(vec![("tenant_id", Value::String("tenant_1".to_string()))]); + RowData::from_pairs(vec![("tenant_id", ColumnValue::text("tenant_1"))]); // Missing "user_id" which is part of composite key let event = ChangeEvent::update( @@ -160,13 +158,13 @@ mod data_source_selection_tests { fn test_data_source_priority_order() { // Test that old_data is always preferred over new_data when available let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("value", Value::String("old_value".to_string())), + ("id", ColumnValue::text("1")), + ("value", ColumnValue::text("old_value")), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("value", Value::String("new_value".to_string())), + ("id", ColumnValue::text("1")), + ("value", ColumnValue::text("new_value")), ]); let event = ChangeEvent::update( @@ -193,11 +191,11 @@ mod data_source_selection_tests { // Verify old_data is selected despite new_data being available assert_eq!( data_source.get("value"), - Some(&Value::String("old_value".to_string())) + Some(&ColumnValue::text("old_value")) ); assert_ne!( data_source.get("value"), - Some(&Value::String("new_value".to_string())) + Some(&ColumnValue::text("new_value")) ); } _ => panic!("Expected Update event"), @@ -208,8 +206,8 @@ mod data_source_selection_tests { fn test_fallback_only_when_old_data_none() { // Test that fallback to new_data only happens when old_data is None let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("value", Value::String("fallback_value".to_string())), + ("id", ColumnValue::text("1")), + ("value", ColumnValue::text("fallback_value")), ]); let event = ChangeEvent::update( @@ -235,7 +233,7 @@ mod data_source_selection_tests { // Verify new_data is used when old_data is None assert_eq!( data_source.get("value"), - Some(&Value::String("fallback_value".to_string())) + Some(&ColumnValue::text("fallback_value")) ); } _ => panic!("Expected Update event"), @@ -251,25 +249,16 @@ mod complex_data_tests { #[test] fn test_complex_json_data_in_where_clause() { // Test with complex JSON data structures - let complex_json = Value::Object(serde_json::Map::from_iter([( - "nested".to_string(), - Value::Object(serde_json::Map::from_iter([( - "array".to_string(), - Value::Array(vec![ - Value::Number(serde_json::Number::from(1)), - Value::String("text".to_string()), - ]), - )])), - )])); + let complex_json_str = r#"{"nested":{"array":[1,"text"]}}"#; let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("json_data", complex_json.clone()), + ("id", ColumnValue::text("1")), + ("json_data", ColumnValue::text(complex_json_str)), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("json_data", complex_json), + ("id", ColumnValue::text("1")), + ("json_data", ColumnValue::text(complex_json_str)), ]); let event = ChangeEvent::update( @@ -290,7 +279,7 @@ mod complex_data_tests { assert!(data_source.get("json_data").is_some()); assert!(matches!( data_source.get("json_data"), - Some(Value::Object(_)) + Some(ColumnValue::Text(_)) )); } _ => panic!("Expected Update event"), @@ -301,10 +290,10 @@ mod complex_data_tests { fn test_various_data_types_in_key_columns() { // Test with different PostgreSQL data types in key columns let old_data = RowData::from_pairs(vec![ - ("string_key", Value::String("key_value".to_string())), - ("int_key", Value::Number(serde_json::Number::from(42))), - ("bool_key", Value::Bool(true)), - ("null_key", Value::Null), + ("string_key", ColumnValue::text("key_value")), + ("int_key", ColumnValue::text("42")), + ("bool_key", ColumnValue::text("true")), + ("null_key", ColumnValue::Null), ]); let event = ChangeEvent::update( @@ -330,14 +319,20 @@ mod complex_data_tests { // Verify all data types are preserved assert!(matches!( data_source.get("string_key"), - Some(Value::String(_)) + Some(ColumnValue::Text(_)) + )); + assert!(matches!( + data_source.get("int_key"), + Some(ColumnValue::Text(_)) )); - assert!(matches!(data_source.get("int_key"), Some(Value::Number(_)))); assert!(matches!( data_source.get("bool_key"), - Some(Value::Bool(true)) + Some(ColumnValue::Text(_)) + )); + assert!(matches!( + data_source.get("null_key"), + Some(ColumnValue::Null) )); - assert!(matches!(data_source.get("null_key"), Some(Value::Null))); } _ => panic!("Expected Update event"), } @@ -365,8 +360,7 @@ mod schema_table_tests { ]; for (schema, table) in test_cases { - let data = - RowData::from_pairs(vec![("id", Value::Number(serde_json::Number::from(1)))]); + let data = RowData::from_pairs(vec![("id", ColumnValue::text("1"))]); let event = ChangeEvent::update( schema.to_string(), diff --git a/pg2any-lib/tests/mysql_error_handling_simple_tests.rs b/pg2any-lib/tests/mysql_error_handling_simple_tests.rs index b8009b2..f67f8d1 100644 --- a/pg2any-lib/tests/mysql_error_handling_simple_tests.rs +++ b/pg2any-lib/tests/mysql_error_handling_simple_tests.rs @@ -2,9 +2,9 @@ /// These tests verify the specific error conditions and messages that should be /// generated by the build_where_clause method in MySQL destination use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; +use pg_walstream::ColumnValue; use pg_walstream::Lsn; use pg_walstream::RowData; -use serde_json::Value; use std::sync::Arc; /// Tests for error conditions in WHERE clause generation @@ -19,8 +19,7 @@ mod where_clause_error_tests { // in the MySQL destination's build_where_clause method // Test Case 1: Missing key column in data - let incomplete_data = - RowData::from_pairs(vec![("name", Value::String("test".to_string()))]); + let incomplete_data = RowData::from_pairs(vec![("name", ColumnValue::text("test"))]); // Missing "id" which is the key column let event_missing_key = ChangeEvent::update( @@ -51,7 +50,7 @@ mod where_clause_error_tests { fn test_replica_identity_nothing_error_conditions() { // Test error conditions specific to NOTHING replica identity - let new_data = RowData::from_pairs(vec![("data", Value::String("some data".to_string()))]); + let new_data = RowData::from_pairs(vec![("data", ColumnValue::text("some data"))]); // UPDATE with NOTHING replica identity let update_event = ChangeEvent::update( diff --git a/pg2any-lib/tests/mysql_where_clause_fix_tests.rs b/pg2any-lib/tests/mysql_where_clause_fix_tests.rs index b4eabd2..e57b3a8 100644 --- a/pg2any-lib/tests/mysql_where_clause_fix_tests.rs +++ b/pg2any-lib/tests/mysql_where_clause_fix_tests.rs @@ -2,22 +2,21 @@ /// These tests verify the critical bug fix where UPDATE/DELETE operations /// now correctly use old_data when available, falling back to new_data when needed use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::Value; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; #[test] fn test_update_uses_old_data_for_where_clause() { // This test verifies the core fix: UPDATE should use old_data for WHERE clause let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(42))), - ("version", Value::Number(serde_json::Number::from(1))), + ("id", ColumnValue::text("42")), + ("version", ColumnValue::text("1")), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(42))), - ("version", Value::Number(serde_json::Number::from(2))), // Version incremented - ("name", Value::String("Updated Name".to_string())), + ("id", ColumnValue::text("42")), + ("version", ColumnValue::text("2")), // Version incremented + ("name", ColumnValue::text("Updated Name")), ]); let event = ChangeEvent::update( @@ -43,10 +42,7 @@ fn test_update_uses_old_data_for_where_clause() { }; // Verify old_data is selected and contains replica identity values - assert_eq!( - data_source.get("version"), - Some(&Value::Number(serde_json::Number::from(1))) - ); + assert_eq!(data_source.get("version"), Some(&ColumnValue::text("1"))); assert!(data_source.get("id").is_some()); // WHERE clause should use version=1 (old), not version=2 (new) @@ -60,8 +56,8 @@ fn test_update_uses_old_data_for_where_clause() { fn test_update_fallback_to_new_data_when_old_data_none() { // Test the fallback behavior when old_data is None let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(100))), - ("status", Value::String("active".to_string())), + ("id", ColumnValue::text("100")), + ("status", ColumnValue::text("active")), ]); let event = ChangeEvent::update( @@ -86,13 +82,10 @@ fn test_update_fallback_to_new_data_when_old_data_none() { }; // Verify fallback works - assert_eq!( - data_source.get("id"), - Some(&Value::Number(serde_json::Number::from(100))) - ); + assert_eq!(data_source.get("id"), Some(&ColumnValue::text("100"))); assert_eq!( data_source.get("status"), - Some(&Value::String("active".to_string())) + Some(&ColumnValue::text("active")) ); } _ => panic!("Expected Update event"), @@ -103,8 +96,8 @@ fn test_update_fallback_to_new_data_when_old_data_none() { fn test_delete_always_uses_old_data() { // DELETE operations should always have old_data available let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(999))), - ("email", Value::String("delete@example.com".to_string())), + ("id", ColumnValue::text("999")), + ("email", ColumnValue::text("delete@example.com")), ]); let event = ChangeEvent::delete( @@ -120,13 +113,10 @@ fn test_delete_always_uses_old_data() { // For DELETE, old_data should always be available match &event.event_type { EventType::Delete { old_data, .. } => { - assert_eq!( - old_data.get("id"), - Some(&Value::Number(serde_json::Number::from(999))) - ); + assert_eq!(old_data.get("id"), Some(&ColumnValue::text("999"))); assert_eq!( old_data.get("email"), - Some(&Value::String("delete@example.com".to_string())) + Some(&ColumnValue::text("delete@example.com")) ); } _ => panic!("Expected Delete event"), @@ -137,15 +127,15 @@ fn test_delete_always_uses_old_data() { fn test_composite_key_data_source_selection() { // Test with composite primary key let old_data = RowData::from_pairs(vec![ - ("tenant_id", Value::String("tenant_1".to_string())), - ("user_id", Value::Number(serde_json::Number::from(42))), - ("balance", Value::Number(serde_json::Number::from(100))), + ("tenant_id", ColumnValue::text("tenant_1")), + ("user_id", ColumnValue::text("42")), + ("balance", ColumnValue::text("100")), ]); let new_data = RowData::from_pairs(vec![ - ("tenant_id", Value::String("tenant_1".to_string())), - ("user_id", Value::Number(serde_json::Number::from(42))), - ("balance", Value::Number(serde_json::Number::from(150))), // Updated + ("tenant_id", ColumnValue::text("tenant_1")), + ("user_id", ColumnValue::text("42")), + ("balance", ColumnValue::text("150")), // Updated ]); let event = ChangeEvent::update( @@ -172,12 +162,9 @@ fn test_composite_key_data_source_selection() { // Both key columns should be available in old_data assert_eq!( data_source.get("tenant_id"), - Some(&Value::String("tenant_1".to_string())) - ); - assert_eq!( - data_source.get("user_id"), - Some(&Value::Number(serde_json::Number::from(42))) + Some(&ColumnValue::text("tenant_1")) ); + assert_eq!(data_source.get("user_id"), Some(&ColumnValue::text("42"))); // WHERE clause: WHERE tenant_id='tenant_1' AND user_id=42 } @@ -189,15 +176,15 @@ fn test_composite_key_data_source_selection() { fn test_replica_identity_full_uses_all_columns() { // Test FULL replica identity with all columns in WHERE clause let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("name", Value::String("John".to_string())), - ("age", Value::Number(serde_json::Number::from(25))), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John")), + ("age", ColumnValue::text("25")), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("name", Value::String("Johnny".to_string())), // Changed - ("age", Value::Number(serde_json::Number::from(26))), // Changed + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Johnny")), // Changed + ("age", ColumnValue::text("26")), // Changed ]); let event = ChangeEvent::update( @@ -216,14 +203,8 @@ fn test_replica_identity_full_uses_all_columns() { let data_source = old_data.as_ref().unwrap(); // All columns available in old_data for FULL replica identity - assert_eq!( - data_source.get("name"), - Some(&Value::String("John".to_string())) - ); - assert_eq!( - data_source.get("age"), - Some(&Value::Number(serde_json::Number::from(25))) - ); + assert_eq!(data_source.get("name"), Some(&ColumnValue::text("John"))); + assert_eq!(data_source.get("age"), Some(&ColumnValue::text("25"))); // WHERE: WHERE id=1 AND name='John' AND age=25 (old values) // SET: SET id=1, name='Johnny', age=26 (new values) @@ -234,15 +215,15 @@ fn test_replica_identity_full_uses_all_columns() { #[test] fn test_json_null_values_in_replica_identity() { - // Test handling of JSON null values + // Test handling of null values let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("optional_field", Value::Null), + ("id", ColumnValue::text("1")), + ("optional_field", ColumnValue::Null), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("optional_field", Value::String("now has value".to_string())), + ("id", ColumnValue::text("1")), + ("optional_field", ColumnValue::text("now has value")), ]); let event = ChangeEvent::update( @@ -261,7 +242,7 @@ fn test_json_null_values_in_replica_identity() { let data_source = old_data.as_ref().unwrap(); // Null values should be preserved in old_data - assert_eq!(data_source.get("optional_field"), Some(&Value::Null)); + assert_eq!(data_source.get("optional_field"), Some(&ColumnValue::Null)); // WHERE clause should handle: WHERE id=1 AND optional_field IS NULL } @@ -273,8 +254,8 @@ fn test_json_null_values_in_replica_identity() { fn test_key_columns_availability() { // Test that key columns are correctly accessible let data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("name", Value::String("Test".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Test")), ]); // Test DEFAULT replica identity (primary key only) diff --git a/pg2any-lib/tests/replica_identity_tests.rs b/pg2any-lib/tests/replica_identity_tests.rs index 70b4c62..17d18e0 100644 --- a/pg2any-lib/tests/replica_identity_tests.rs +++ b/pg2any-lib/tests/replica_identity_tests.rs @@ -1,18 +1,17 @@ use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::Value; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; #[tokio::test] async fn test_replica_identity_default_with_primary_key() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), ]); let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), ]); let event = ChangeEvent::update( @@ -41,15 +40,15 @@ async fn test_replica_identity_default_with_primary_key() { #[tokio::test] async fn test_replica_identity_full() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), - ("email", Value::String("john@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), + ("email", ColumnValue::text("john@example.com")), ]); let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), - ("email", Value::String("jane@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), + ("email", ColumnValue::text("jane@example.com")), ]); let event = ChangeEvent::update( @@ -76,14 +75,14 @@ async fn test_replica_identity_full() { #[tokio::test] async fn test_replica_identity_index() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), - ("email", Value::String("john@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), + ("email", ColumnValue::text("john@example.com")), ]); let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("email", Value::String("jane@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("email", ColumnValue::text("jane@example.com")), ]); let event = ChangeEvent::update( @@ -110,8 +109,8 @@ async fn test_replica_identity_index() { #[tokio::test] async fn test_replica_identity_nothing() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), ]); let event = ChangeEvent::update( @@ -136,8 +135,8 @@ async fn test_replica_identity_nothing() { #[tokio::test] async fn test_delete_with_replica_identity_default() { let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), ]); let event = ChangeEvent::delete( @@ -164,9 +163,9 @@ async fn test_delete_with_replica_identity_default() { #[tokio::test] async fn test_delete_with_replica_identity_full() { let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), - ("email", Value::String("jane@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), + ("email", ColumnValue::text("jane@example.com")), ]); let event = ChangeEvent::delete( diff --git a/pg2any-lib/tests/sqlite_comprehensive_tests.rs b/pg2any-lib/tests/sqlite_comprehensive_tests.rs index 6b9c382..ce450e8 100644 --- a/pg2any-lib/tests/sqlite_comprehensive_tests.rs +++ b/pg2any-lib/tests/sqlite_comprehensive_tests.rs @@ -4,8 +4,7 @@ use pg2any_lib::{ types::{ChangeEvent, DestinationType, ReplicaIdentity}, Transaction, }; -use pg_walstream::{Lsn, RowData}; -use serde_json::json; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -27,9 +26,8 @@ fn event_to_sql(event: &ChangeEvent) -> Option { use pg2any_lib::types::EventType; match &event.event_type { EventType::Insert { table, data, .. } => { - let data_map = data.clone().into_hash_map(); - let cols: Vec<_> = data_map.keys().cloned().collect(); - let vals: Vec<_> = cols.iter().map(|c| format_value(&data_map[c])).collect(); + let cols: Vec = data.iter().map(|(k, _)| k.to_string()).collect(); + let vals: Vec = data.iter().map(|(_, v)| format_column_value(v)).collect(); Some(format!( "INSERT INTO \"{}\" ({}) VALUES ({});", table, @@ -47,19 +45,18 @@ fn event_to_sql(event: &ChangeEvent) -> Option { key_columns, .. } => { - let new_data_map = new_data.clone().into_hash_map(); - let set: Vec<_> = new_data_map + let set: Vec = new_data .iter() - .map(|(k, v)| format!("\"{}\" = {}", k, format_value(v))) + .map(|(k, v)| format!("\"{}\" = {}", k, format_column_value(v))) .collect(); - let cond: Vec<_> = key_columns + let cond: Vec = key_columns .iter() .filter_map(|k| { old_data .as_ref() .and_then(|d| d.get(k.as_ref())) .or_else(|| new_data.get(k.as_ref())) - .map(|v| format!("\"{}\" = {}", k, format_value(v))) + .map(|v| format!("\"{}\" = {}", k, format_column_value(v))) }) .collect(); if cond.is_empty() { @@ -78,12 +75,12 @@ fn event_to_sql(event: &ChangeEvent) -> Option { key_columns, .. } => { - let cond: Vec<_> = key_columns + let cond: Vec = key_columns .iter() .filter_map(|k| { old_data .get(k.as_ref()) - .map(|v| format!("\"{}\" = {}", k, format_value(v))) + .map(|v| format!("\"{}\" = {}", k, format_column_value(v))) }) .collect(); if cond.is_empty() { @@ -99,13 +96,31 @@ fn event_to_sql(event: &ChangeEvent) -> Option { } } -fn format_value(v: &serde_json::Value) -> String { - match v { - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Bool(b) => if *b { "1" } else { "0" }.to_string(), - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => format!("'{}'", s.replace("'", "''")), - _ => format!("'{}'", v.to_string().replace("'", "''")), +fn format_column_value(value: &ColumnValue) -> String { + match value { + ColumnValue::Null => "NULL".to_string(), + ColumnValue::Text(b) => match std::str::from_utf8(b) { + Ok(s) => { + if s.parse::().is_ok() || s.parse::().is_ok() { + s.to_string() + } else if s == "true" { + "1".to_string() + } else if s == "false" { + "0".to_string() + } else { + format!("'{}'", s.replace('\'', "''")) + } + } + Err(_) => "NULL".to_string(), + }, + ColumnValue::Binary(b) => { + format!( + "X'{}'", + b.iter() + .map(|byte| format!("{:02x}", byte)) + .collect::() + ) + } } } @@ -218,9 +233,9 @@ async fn test_sqlite_empty_string_handling() { // Test empty string insertion let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("text_field", json!("")), - ("nullable_field", json!("")), + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text("")), + ("nullable_field", ColumnValue::text("")), ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); @@ -263,11 +278,11 @@ async fn test_sqlite_null_value_handling() { // Test null value insertion let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("text_field", json!("test")), - ("nullable_field", json!(null)), - ("int_field", json!(null)), - ("real_field", json!(null)), + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text("test")), + ("nullable_field", ColumnValue::Null), + ("int_field", ColumnValue::Null), + ("real_field", ColumnValue::Null), ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); @@ -315,14 +330,14 @@ async fn test_sqlite_unicode_and_special_characters() { // Test unicode and special character insertion let data = RowData::from_pairs(vec![ - ("id", json!(1)), + ("id", ColumnValue::text("1")), ( "text_field", - json!("🚀 Hello 世界! Special chars: áéíóú ñüç"), + ColumnValue::text("🚀 Hello 世界! Special chars: áéíóú ñüç"), ), ( "json_field", - json!("{\"emoji\": \"😀\", \"chinese\": \"你好\"}"), + ColumnValue::text("{\"emoji\": \"😀\", \"chinese\": \"你好\"}"), ), ]); @@ -370,17 +385,16 @@ async fn test_sqlite_large_data_handling() { // Create large text data (1MB) let large_text = "A".repeat(1024 * 1024); - let large_json = json!({ - "data": "B".repeat(500000), - "nested": { - "more_data": "C".repeat(500000) - } - }); + let large_json = format!( + "{{\"data\": \"{}\", \"nested\": {{\"more_data\": \"{}\"}}}}", + "B".repeat(500000), + "C".repeat(500000) + ); let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("text_field", json!(large_text)), - ("json_field", large_json), + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text(&large_text)), + ("json_field", ColumnValue::text(&large_json)), ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); @@ -423,16 +437,19 @@ async fn test_sqlite_numeric_precision() { create_comprehensive_test_table(&pool).await.unwrap(); // Test various numeric types and precision - let test_cases = vec![ - (1, json!(9223372036854775807i64)), // i64 max - (2, json!(-9223372036854775808i64)), // i64 min - (3, json!(3.141592653589793)), // High precision float - (4, json!(1.7976931348623157e308)), // Large float - (5, json!(2.2250738585072014e-308)), // Small float + let test_cases: Vec<(i32, ColumnValue)> = vec![ + (1, ColumnValue::text("9223372036854775807")), // i64 max + (2, ColumnValue::text("-9223372036854775808")), // i64 min + (3, ColumnValue::text("3.141592653589793")), // High precision float + (4, ColumnValue::text("1.7976931348623157e308")), // Large float + (5, ColumnValue::text("2.2250738585072014e-308")), // Small float ]; - for (id, value) in test_cases { - let data = RowData::from_pairs(vec![("id", json!(id)), ("real_field", value.clone())]); + for (id, value) in &test_cases { + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text(&id.to_string())), + ("real_field", value.clone()), + ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); let result = destination @@ -496,8 +513,8 @@ async fn test_sqlite_constraint_violations() { // Insert initial data let data1 = RowData::from_pairs(vec![ - ("id", json!(1)), - ("unique_field", json!("unique_value")), + ("id", ColumnValue::text("1")), + ("unique_field", ColumnValue::text("unique_value")), ]); let event1 = ChangeEvent::insert("main", "comprehensive_test", 123, data1, Lsn::from(100)); @@ -511,8 +528,8 @@ async fn test_sqlite_constraint_violations() { // Try to insert duplicate unique value - should fail let data2 = RowData::from_pairs(vec![ - ("id", json!(2)), - ("unique_field", json!("unique_value")), + ("id", ColumnValue::text("2")), + ("unique_field", ColumnValue::text("unique_value")), ]); let event2 = ChangeEvent::insert("main", "comprehensive_test", 124, data2, Lsn::from(100)); @@ -546,7 +563,10 @@ async fn test_sqlite_missing_key_columns_error() { create_comprehensive_test_table(&pool).await.unwrap(); // Insert initial data - let data = RowData::from_pairs(vec![("id", json!(1)), ("text_field", json!("test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text("test")), + ]); let event = ChangeEvent::insert( "main", @@ -613,12 +633,18 @@ async fn test_sqlite_bulk_operations_performance() { // Bulk INSERT operations for i in 0..batch_size { let data = RowData::from_pairs(vec![ - ("id", json!(i)), - ("name", json!(format!("User {}", i))), - ("age", json!(20 + (i % 50))), - ("email", json!(format!("user{}@example.com", i))), - ("active", json!(true)), - ("salary", json!(50000.0 + (i as f64 * 10.0))), + ("id", ColumnValue::text(&i.to_string())), + ("name", ColumnValue::text(&format!("User {}", i))), + ("age", ColumnValue::text(&(20 + (i % 50)).to_string())), + ( + "email", + ColumnValue::text(&format!("user{}@example.com", i)), + ), + ("active", ColumnValue::text("true")), + ( + "salary", + ColumnValue::text(&format!("{}", 50000.0 + (i as f64 * 10.0))), + ), ]); let event = ChangeEvent::insert("main", "users", 123 + i as u32, data, Lsn::from(100)); @@ -648,15 +674,21 @@ async fn test_sqlite_bulk_operations_performance() { // Bulk UPDATE operations let update_start = Instant::now(); for i in 0..batch_size / 2 { - let old_data = RowData::from_pairs(vec![("id", json!(i))]); + let old_data = RowData::from_pairs(vec![("id", ColumnValue::text(&i.to_string()))]); let new_data = RowData::from_pairs(vec![ - ("id", json!(i)), - ("name", json!(format!("Updated User {}", i))), - ("age", json!(25 + (i % 50))), - ("email", json!(format!("updated_user{}@example.com", i))), - ("active", json!(true)), - ("salary", json!(60000.0 + (i as f64 * 15.0))), + ("id", ColumnValue::text(&i.to_string())), + ("name", ColumnValue::text(&format!("Updated User {}", i))), + ("age", ColumnValue::text(&(25 + (i % 50)).to_string())), + ( + "email", + ColumnValue::text(&format!("updated_user{}@example.com", i)), + ), + ("active", ColumnValue::text("true")), + ( + "salary", + ColumnValue::text(&format!("{}", 60000.0 + (i as f64 * 15.0))), + ), ]); let event = ChangeEvent::update( @@ -728,15 +760,15 @@ async fn test_sqlite_concurrent_operations() { for i in 0..15 { let record_id = session_id * 1000 + i; let data = RowData::from_pairs(vec![ - ("id", json!(record_id)), + ("id", ColumnValue::text(&record_id.to_string())), ( "name", - json!(format!("Session {} Record {}", session_id, i)), + ColumnValue::text(&format!("Session {} Record {}", session_id, i)), ), - ("age", json!(25 + i % 30)), + ("age", ColumnValue::text(&(25 + i % 30).to_string())), ( "email", - json!(format!("session{}record{}@example.com", session_id, i)), + ColumnValue::text(&format!("session{}record{}@example.com", session_id, i)), ), ]); @@ -815,9 +847,9 @@ async fn test_sqlite_transaction_events() { // Insert some data between transaction events let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("Transaction Test")), - ("email", json!("txn@example.com")), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Transaction Test")), + ("email", ColumnValue::text("txn@example.com")), ]); let insert_event = ChangeEvent::insert("main", "users", 105, data, Lsn::from(100)); @@ -901,7 +933,10 @@ async fn test_sqlite_connection_recovery() { .unwrap(); create_users_table(&pool).await.unwrap(); - let values = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("Alice"))]); + let values = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Alice")), + ]); let event = ChangeEvent::insert("public", "users", 1, values, Lsn::from(100)); let transaction = wrap_in_transaction(event); let process_result = destination @@ -918,7 +953,10 @@ async fn test_sqlite_connection_recovery() { assert!(reconnect_result.is_ok()); // Verify connection works again - let values2 = RowData::from_pairs(vec![("id", json!(2)), ("name", json!("Bob"))]); + let values2 = RowData::from_pairs(vec![ + ("id", ColumnValue::text("2")), + ("name", ColumnValue::text("Bob")), + ]); let event2 = ChangeEvent::insert("public", "users", 2, values2, Lsn::from(100)); let transaction2 = wrap_in_transaction(event2); let process_result2 = destination @@ -1007,12 +1045,12 @@ async fn test_sqlite_complete_crud_cycle() { // 1. CREATE (INSERT) let create_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Doe")), - ("age", json!(30)), - ("email", json!("john@example.com")), - ("active", json!(true)), - ("salary", json!(50000.0)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Doe")), + ("age", ColumnValue::text("30")), + ("email", ColumnValue::text("john@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("50000")), ]); let create_event = ChangeEvent::insert("main", "users", 1, create_data.clone(), Lsn::from(100)); @@ -1044,12 +1082,12 @@ async fn test_sqlite_complete_crud_cycle() { // 3. UPDATE let update_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Smith")), - ("age", json!(31)), - ("email", json!("john.smith@example.com")), - ("active", json!(true)), - ("salary", json!(55000.0)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Smith")), + ("age", ColumnValue::text("31")), + ("email", ColumnValue::text("john.smith@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("55000")), ]); let update_event = ChangeEvent::update( @@ -1086,7 +1124,7 @@ async fn test_sqlite_complete_crud_cycle() { assert_eq!(updated_row.get::("salary"), 55000.0); // 4. DELETE - let delete_data = RowData::from_pairs(vec![("id", json!(1))]); + let delete_data = RowData::from_pairs(vec![("id", ColumnValue::text("1"))]); let delete_event = ChangeEvent::delete( "main", @@ -1137,7 +1175,10 @@ async fn test_sqlite_destination_factory_integration() { .unwrap(); create_users_table(&pool).await.unwrap(); - let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("Factory Test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Factory Test")), + ]); let event = ChangeEvent::insert("main", "users", 1, data, Lsn::from(100)); let process_result = destination diff --git a/pg2any-lib/tests/sqlite_destination_tests.rs b/pg2any-lib/tests/sqlite_destination_tests.rs index 41a9bf9..d299883 100644 --- a/pg2any-lib/tests/sqlite_destination_tests.rs +++ b/pg2any-lib/tests/sqlite_destination_tests.rs @@ -4,8 +4,7 @@ use pg2any_lib::{ types::{ChangeEvent, DestinationType, EventType, ReplicaIdentity}, Transaction, }; -use pg_walstream::{Lsn, RowData}; -use serde_json::json; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::fs; @@ -32,12 +31,8 @@ fn transaction_to_sql_commands(tx: &Transaction) -> Vec { fn event_to_sql(event: &ChangeEvent) -> Option { match &event.event_type { EventType::Insert { table, data, .. } => { - let data_map = data.clone().into_hash_map(); - let columns: Vec = data_map.keys().cloned().collect(); - let values: Vec = columns - .iter() - .map(|col| format_value(&data_map[col])) - .collect(); + let columns: Vec = data.iter().map(|(k, _)| k.to_string()).collect(); + let values: Vec = data.iter().map(|(_, v)| format_column_value(v)).collect(); Some(format!( "INSERT INTO \"{}\" ({}) VALUES ({});", table, @@ -57,10 +52,9 @@ fn event_to_sql(event: &ChangeEvent) -> Option { replica_identity, .. } => { - let new_data_map = new_data.clone().into_hash_map(); - let set_clause: Vec = new_data_map + let set_clause: Vec = new_data .iter() - .map(|(col, val)| format!("\"{}\" = {}", col, format_value(val))) + .map(|(col, val)| format!("\"{}\" = {}", col, format_column_value(val))) .collect(); // For REPLICA IDENTITY FULL with empty key_columns, use all old_data columns @@ -68,7 +62,7 @@ fn event_to_sql(event: &ChangeEvent) -> Option { if key_columns.is_empty() && matches!(replica_identity, ReplicaIdentity::Full) { old_data .as_ref() - .map(|d| d.clone().into_hash_map().into_keys().collect()) + .map(|d| d.iter().map(|(k, _)| k.to_string()).collect()) .unwrap_or_default() } else { key_columns.iter().map(|k| k.to_string()).collect() @@ -82,7 +76,7 @@ fn event_to_sql(event: &ChangeEvent) -> Option { .as_ref() .and_then(|d| d.get(col.as_str())) .or_else(|| new_data.get(col.as_str())) - .map(|val| format!("\"{}\" = {}", col, format_value(val))) + .map(|val| format!("\"{}\" = {}", col, format_column_value(val))) }) .collect(); if conditions.is_empty() { @@ -112,7 +106,7 @@ fn event_to_sql(event: &ChangeEvent) -> Option { .filter_map(|col| { old_data .get(col.as_ref()) - .map(|val| format!("\"{}\" = {}", col, format_value(val))) + .map(|val| format!("\"{}\" = {}", col, format_column_value(val))) }) .collect(); if conditions.is_empty() { @@ -143,14 +137,35 @@ fn event_to_sql(event: &ChangeEvent) -> Option { } } -/// Helper function to format a JSON value as SQL literal -fn format_value(value: &serde_json::Value) -> String { +/// Helper function to format a ColumnValue as SQL literal +fn format_column_value(value: &ColumnValue) -> String { match value { - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Bool(b) => if *b { "1" } else { "0" }.to_string(), - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => format!("'{}'", s.replace('\'', "''")), - _ => format!("'{}'", value.to_string().replace('\'', "''")), + ColumnValue::Null => "NULL".to_string(), + ColumnValue::Text(b) => { + match std::str::from_utf8(b) { + Ok(s) => { + // Try to parse as number + if s.parse::().is_ok() || s.parse::().is_ok() { + s.to_string() + } else if s == "true" { + "1".to_string() + } else if s == "false" { + "0".to_string() + } else { + format!("'{}'", s.replace('\'', "''")) + } + } + Err(_) => "NULL".to_string(), + } + } + ColumnValue::Binary(b) => { + format!( + "X'{}'", + b.iter() + .map(|byte| format!("{:02x}", byte)) + .collect::() + ) + } } } @@ -198,24 +213,24 @@ impl Drop for TempDatabase { /// Helper function to create test data fn create_test_data() -> RowData { RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Doe")), - ("age", json!(30)), - ("email", json!("john@example.com")), - ("active", json!(true)), - ("salary", json!(50000.50)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Doe")), + ("age", ColumnValue::text("30")), + ("email", ColumnValue::text("john@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("50000.50")), ]) } /// Helper function to create updated test data fn create_updated_test_data() -> RowData { RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Smith")), - ("age", json!(31)), - ("email", json!("john.smith@example.com")), - ("active", json!(true)), - ("salary", json!(55000.75)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Smith")), + ("age", ColumnValue::text("31")), + ("email", ColumnValue::text("john.smith@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("55000.75")), ]) } @@ -575,15 +590,15 @@ async fn test_sqlite_destination_replica_identity_full() { // Create UPDATE event with REPLICA IDENTITY FULL let old_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Doe")), - ("email", json!("john@example.com")), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Doe")), + ("email", ColumnValue::text("john@example.com")), ]); let new_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Smith")), - ("email", json!("john.smith@example.com")), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Smith")), + ("email", ColumnValue::text("john.smith@example.com")), ]); let event = ChangeEvent::update( @@ -681,13 +696,13 @@ async fn test_sqlite_destination_complex_data_types() { // Create INSERT event with complex data types let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("json_array", json!([1, 2, 3, "test"])), + ("id", ColumnValue::text("1")), + ("json_array", ColumnValue::text("[1,2,3,\"test\"]")), ( "json_object", - json!({"key": "value", "nested": {"number": 42}}), + ColumnValue::text("{\"key\":\"value\",\"nested\":{\"number\":42}}"), ), - ("null_value", json!(null)), + ("null_value", ColumnValue::Null), ]); let event = ChangeEvent::insert("main", "complex_data", 123, data, Lsn::from(100)); diff --git a/pg2any-lib/tests/where_clause_fix_tests.rs b/pg2any-lib/tests/where_clause_fix_tests.rs index 1f1c150..d3fb2fc 100644 --- a/pg2any-lib/tests/where_clause_fix_tests.rs +++ b/pg2any-lib/tests/where_clause_fix_tests.rs @@ -1,5 +1,5 @@ use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; /// Test to demonstrate the fix for UPDATE WHERE clause issue @@ -8,33 +8,15 @@ fn test_update_where_clause_uses_old_data() { // Create test event simulating PostgreSQL logical replication UPDATE // with replica identity containing old key values let old_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(42)), - ), - ( - "email", - serde_json::Value::String("old.email@example.com".to_string()), - ), + ("id", ColumnValue::text("42")), + ("email", ColumnValue::text("old.email@example.com")), ]); let new_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(42)), - ), - ( - "email", - serde_json::Value::String("new.email@example.com".to_string()), - ), - ( - "name", - serde_json::Value::String("Updated Name".to_string()), - ), - ( - "age", - serde_json::Value::Number(serde_json::Number::from(30)), - ), + ("id", ColumnValue::text("42")), + ("email", ColumnValue::text("new.email@example.com")), + ("name", ColumnValue::text("Updated Name")), + ("age", ColumnValue::text("30")), ]); let event = ChangeEvent::update( @@ -58,43 +40,32 @@ fn test_update_where_clause_uses_old_data() { let old_data = old_data.as_ref().unwrap(); assert!(old_data.get("id").is_some()); assert!(old_data.get("email").is_some()); - assert_eq!( - old_data.get("id").unwrap(), - &serde_json::Value::Number(serde_json::Number::from(42)) - ); - assert_eq!( - old_data.get("email").unwrap(), - &serde_json::Value::String("old.email@example.com".to_string()) - ); + assert_eq!(old_data.get("id").unwrap(), "42"); + assert_eq!(old_data.get("email").unwrap(), "old.email@example.com"); // 2. Verify new_data contains the updated values assert!(new_data.get("id").is_some()); assert!(new_data.get("email").is_some()); assert!(new_data.get("name").is_some()); assert!(new_data.get("age").is_some()); - assert_eq!( - new_data.get("email").unwrap(), - &serde_json::Value::String("new.email@example.com".to_string()) - ); + assert_eq!(new_data.get("email").unwrap(), "new.email@example.com"); // 3. Simulate the logic our fixed code should use: // - SET clause should use new_data keys/values // - WHERE clause should use old_data keys/values - let set_map = new_data.clone().into_hash_map(); - let where_map = old_data.clone().into_hash_map(); - let set_keys: Vec<&String> = set_map.keys().collect(); - let where_keys: Vec<&String> = where_map.keys().collect(); + let set_keys: Vec<&str> = new_data.iter().map(|(k, _)| k.as_ref()).collect(); + let where_keys: Vec<&str> = old_data.iter().map(|(k, _)| k.as_ref()).collect(); // Verify SET clause would include all new columns - assert!(set_keys.contains(&&"id".to_string())); - assert!(set_keys.contains(&&"email".to_string())); - assert!(set_keys.contains(&&"name".to_string())); - assert!(set_keys.contains(&&"age".to_string())); + assert!(set_keys.contains(&"id")); + assert!(set_keys.contains(&"email")); + assert!(set_keys.contains(&"name")); + assert!(set_keys.contains(&"age")); // Verify WHERE clause would use replica identity (old_data) - assert!(where_keys.contains(&&"id".to_string())); - assert!(where_keys.contains(&&"email".to_string())); + assert!(where_keys.contains(&"id")); + assert!(where_keys.contains(&"email")); // The generated SQL should conceptually be: // UPDATE `public`.`users` @@ -113,14 +84,8 @@ fn test_update_where_clause_uses_old_data() { #[test] fn test_delete_where_clause_uses_old_data() { let old_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(99)), - ), - ( - "username", - serde_json::Value::String("user_to_delete".to_string()), - ), + ("id", ColumnValue::text("99")), + ("username", ColumnValue::text("user_to_delete")), ]); let event = ChangeEvent::delete( @@ -151,11 +116,8 @@ fn test_delete_where_clause_uses_old_data() { #[test] fn test_update_fallback_when_no_old_data() { let new_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(123)), - ), - ("name", serde_json::Value::String("New Name".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("New Name")), ]); let event = ChangeEvent::update(