From 69589e0d0ae36d367d800af0bf04688b8d6592b6 Mon Sep 17 00:00:00 2001 From: Danielshih Date: Wed, 25 Feb 2026 16:17:48 +0000 Subject: [PATCH 1/2] upgrade pg_walstream version to 0.5.00 - Updated all test cases in replica_identity_tests.rs, sqlite_comprehensive_tests.rs, sqlite_destination_tests.rs, and where_clause_fix_tests.rs to replace serde_json::Value with pg_walstream::ColumnValue for consistency and improved type safety. - Modified the format_value function to format ColumnValue types correctly in SQL statements. - Introduced a new ReplicationActor to manage the LogicalReplicationStream in a dedicated thread, ensuring thread safety and proper command handling. - Enhanced the actor's command structure to support starting the replication stream, reading events, sending feedback, and gracefully stopping the actor. --- Cargo.lock | 9 +- Cargo.toml | 2 +- pg2any-lib/src/client.rs | 74 +++--- pg2any-lib/src/lib.rs | 4 +- pg2any-lib/src/replication_actor.rs | 237 ++++++++++++++++++ pg2any-lib/src/transaction_manager.rs | 56 ++--- .../tests/destination_integration_tests.rs | 52 ++-- pg2any-lib/tests/event_type_refactor_tests.rs | 28 ++- pg2any-lib/tests/metrics_logical_tests.rs | 20 +- pg2any-lib/tests/mysql_edge_cases_tests.rs | 74 +++--- .../mysql_error_handling_simple_tests.rs | 7 +- .../tests/mysql_where_clause_fix_tests.rs | 97 +++---- pg2any-lib/tests/replica_identity_tests.rs | 47 ++-- .../tests/sqlite_comprehensive_tests.rs | 217 +++++++++------- pg2any-lib/tests/sqlite_destination_tests.rs | 101 ++++---- pg2any-lib/tests/where_clause_fix_tests.rs | 82 ++---- 16 files changed, 669 insertions(+), 438 deletions(-) create mode 100644 pg2any-lib/src/replication_actor.rs diff --git a/Cargo.lock b/Cargo.lock index 0b42141..1ac2d90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,9 +295,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.43" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", "js-sys", @@ -1506,15 +1506,14 @@ dependencies = [ [[package]] name = "pg_walstream" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6af2b7580eba9cde84a0a7e539fd5eeac7f1c726ab7d9ed5181b49f4bc3f38a7" +checksum = "7ada0907d94f64d9107377b397f5467d2af4e03a631bb9c25a9e0c33c0c5cdb5" dependencies = [ "bytes", "chrono", "libpq-sys", "serde", - "serde_json", "thiserror 2.0.18", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 3352a78..4c26683 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ thiserror = "2.0.12" lazy_static = "1.5" tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } -pg_walstream = "0.4.1" +pg_walstream = "0.5.0" tiberius = { version = "0.12.3", features = ["tds73", "sql-browser-tokio", "bigdecimal", "rust_decimal", "time", "chrono"] } sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "mysql", "sqlite", "chrono", "uuid"] } flate2 = "1.1.5" diff --git a/pg2any-lib/src/client.rs b/pg2any-lib/src/client.rs index c6d0f52..5caf241 100644 --- a/pg2any-lib/src/client.rs +++ b/pg2any-lib/src/client.rs @@ -3,6 +3,7 @@ use crate::destinations::{DestinationFactory, DestinationHandler}; use crate::error::{CdcError, Result}; use crate::lsn_tracker::{LsnTracker, SharedLsnFeedback}; use crate::monitoring::{MetricsCollector, MetricsCollectorTrait}; +use crate::replication_actor::{ActorCommand, ReplicationActorHandle}; use crate::transaction_manager::{ PendingTransactionFile, TransactionFileMetadata, TransactionManager, }; @@ -11,7 +12,7 @@ use chrono::{DateTime, Utc}; use pg_walstream::{LogicalReplicationStream, ReplicationStreamConfig}; use std::collections::{BinaryHeap, HashMap}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -77,8 +78,8 @@ pub struct CdcClient { lsn_tracker: Arc, /// Transaction file manager for file-based workflow transaction_file_manager: Arc, - /// Replication stream for PostgreSQL connection - replication_stream: Arc>, + /// Actor handle for the replication stream (Send + Sync safe) + replication_actor: ReplicationActorHandle, } impl CdcClient { @@ -127,6 +128,7 @@ impl CdcClient { let stream_config = ReplicationStreamConfig::from(&config); let replication_stream = LogicalReplicationStream::new(&config.source_connection_string, stream_config).await?; + let replication_actor = ReplicationActorHandle::spawn(replication_stream); let client = Self { config, @@ -137,7 +139,7 @@ impl CdcClient { metrics_collector: Arc::new(MetricsCollector::new()), lsn_tracker, transaction_file_manager: Arc::new(manager), - replication_stream: Arc::new(Mutex::new(replication_stream)), + replication_actor, }; Ok((client, start_lsn)) @@ -175,11 +177,7 @@ impl CdcClient { // Start the replication stream { let start_xlog = start_lsn.map(|lsn| lsn.0); - self.replication_stream - .lock() - .await - .start(start_xlog) - .await?; + self.replication_actor.start(start_xlog).await?; } // Start file-based workflow @@ -208,11 +206,8 @@ impl CdcClient { let (producer_shutdown_tx, producer_shutdown_rx) = oneshot::channel::<()>(); info!("Created producer shutdown notification channel"); - // Get shared_lsn_feedback from stored replication_stream - let shared_lsn_feedback = { - let stream_guard = self.replication_stream.as_ref().lock().await; - stream_guard.shared_lsn_feedback.clone() - }; + // Get shared_lsn_feedback from actor handle + let shared_lsn_feedback = self.replication_actor.shared_lsn_feedback.clone(); if let Some(ref mut handler) = self.destination_handler { info!("Processing pending transaction files from previous run (recovery)..."); @@ -235,8 +230,8 @@ impl CdcClient { } } - // Clone Arc of replication_stream for the producer - let replication_stream = self.replication_stream.clone(); + // Clone the actor command sender for the producer + let actor_cmd_tx = self.replication_actor.cmd_tx.clone(); // Start producer (writes to files only) let producer_handle = { @@ -247,7 +242,7 @@ impl CdcClient { let lsn_feedback = shared_lsn_feedback.clone(); tokio::spawn(Self::run_producer( - replication_stream, + actor_cmd_tx, token, start_lsn, metrics, @@ -472,7 +467,7 @@ impl CdcClient { /// 3. Consumer receives None from mpsc (channel closed) and processes remaining queue /// 4. Consumer then exits after draining all pending transactions async fn run_producer( - replication_stream: Arc>, + actor_cmd_tx: mpsc::Sender, cancellation_token: CancellationToken, start_lsn: Lsn, metrics_collector: Arc, @@ -537,10 +532,27 @@ impl CdcClient { } while !cancellation_token.is_cancelled() { - // Get the next event from the replication stream (lock for the duration of the call) + // Get the next event from the replication actor let event_result = { - let mut stream = replication_stream.lock().await; - stream.next_event_with_retry(&cancellation_token).await + let (tx, rx) = oneshot::channel(); + if actor_cmd_tx + .send(ActorCommand::NextEvent { + cancel: cancellation_token.clone(), + reply: tx, + }) + .await + .is_err() + { + error!("Replication actor has shut down"); + break; + } + match rx.await { + Ok(r) => r, + Err(_) => { + error!("Replication actor dropped reply"); + break; + } + } }; match event_result { @@ -1231,23 +1243,25 @@ impl CdcClient { info!("Both producer and consumer completed gracefully"); { info!("Sending final ACK to PostgreSQL before shutdown"); - let mut stream = self.replication_stream.as_ref().lock().await; - stream + self.replication_actor .shared_lsn_feedback .log_state("Final shutdown - LSN state before ACK"); - if let Err(e) = stream.send_feedback() { + if let Err(e) = self.replication_actor.send_feedback().await { warn!("Failed to send final feedback: {}", e); } - info!( - "Stopping logical replication stream (last received LSN: {})", - pg_walstream::format_lsn(stream.current_lsn()) - ); + match self.replication_actor.current_lsn().await { + Ok(lsn) => info!( + "Stopping logical replication stream (last received LSN: {})", + pg_walstream::format_lsn(lsn) + ), + Err(e) => warn!("Could not get current LSN: {}", e), + } - if let Err(e) = stream.stop().await { + if let Err(e) = self.replication_actor.stop().await { error!("Failed to stop replication stream: {}", e); - return Err(CdcError::from(e)); + return Err(e); } info!("Final ACK sent successfully to PostgreSQL"); diff --git a/pg2any-lib/src/lib.rs b/pg2any-lib/src/lib.rs index eb72a21..8d8ce54 100644 --- a/pg2any-lib/src/lib.rs +++ b/pg2any-lib/src/lib.rs @@ -66,6 +66,9 @@ pub mod types; pub mod lsn_tracker; +// Actor-based replication stream wrapper (Send + Sync safe) +pub mod replication_actor; + // High-level client interface pub mod client; @@ -92,7 +95,6 @@ pub mod destinations; pub use pg_walstream::{ // Type aliases and utilities format_lsn, - format_postgres_timestamp, // Protocol types message_types, parse_lsn, diff --git a/pg2any-lib/src/replication_actor.rs b/pg2any-lib/src/replication_actor.rs new file mode 100644 index 0000000..a47bec7 --- /dev/null +++ b/pg2any-lib/src/replication_actor.rs @@ -0,0 +1,237 @@ +//! Replication stream actor — owns `LogicalReplicationStream` on a dedicated +//! single-threaded runtime so the non-`Sync` libpq connection never crosses +//! thread boundaries. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────┐ mpsc::channel +//! │ ReplicationActorHandle│ ───Command──►┐ +//! │ (Send + Sync) │ │ +//! └─────────────────────────┘ ▼ +//! ┌────────────────────┐ +//! │ Dedicated Thread │ +//! │ (single-threaded │ +//! │ tokio runtime) │ +//! │ │ +//! │ LogicalReplication │ +//! │ Stream (owned) │ +//! └────────────────────┘ +//! │ +//! Events / Results +//! │ +//! ▼ +//! mpsc::channel / oneshot +//! ``` +//! +//! The handle is fully `Send + Sync` and can be used freely from any tokio task. + +use crate::error::{CdcError, Result}; +use pg_walstream::{LogicalReplicationStream, SharedLsnFeedback}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info}; + +/// Re-export for convenience. +pub use pg_walstream::types::ChangeEvent; + +pub enum ActorCommand { + /// Start the replication stream with an optional starting LSN. + Start { + start_lsn: Option, + reply: oneshot::Sender>, + }, + /// Read the next event (with automatic retry / keepalive handling). + NextEvent { + cancel: CancellationToken, + reply: oneshot::Sender>, + }, + /// Send feedback (standby status update) to PostgreSQL. + SendFeedback { reply: oneshot::Sender> }, + /// Get the current LSN position. + CurrentLsn { reply: oneshot::Sender }, + /// Gracefully stop the replication stream. + Stop { reply: oneshot::Sender> }, +} + +// --------------------------------------------------------------------------- +// Actor handle — the public, Send + Sync interface +// --------------------------------------------------------------------------- + +/// A `Send + Sync` handle to the replication actor. +/// +/// All interaction with the underlying `LogicalReplicationStream` goes through message-passing, so no `Sync` bound is required on the stream itself. +pub struct ReplicationActorHandle { + pub cmd_tx: mpsc::Sender, + /// Thread join handle so the caller can wait for the actor to shut down. + join_handle: Option>, + /// Shared LSN feedback — thread-safe, can be used directly from any task. + pub shared_lsn_feedback: Arc, +} + +impl ReplicationActorHandle { + /// Spawn a new actor that owns the given `LogicalReplicationStream`. + /// + /// It's own single-threaded tokio runtime, so libpq's `*mut PGconn` never needs to be `Send`/`Sync`. + pub fn spawn(stream: LogicalReplicationStream) -> Self { + let shared_lsn_feedback = stream.shared_lsn_feedback.clone(); + // Bounded command channel — back-pressure if the actor is busy. + let (cmd_tx, cmd_rx) = mpsc::channel::(32); + + let join_handle = std::thread::Builder::new() + .name("replication-actor".into()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create actor runtime"); + + rt.block_on(actor_loop(stream, cmd_rx)); + }) + .expect("Failed to spawn replication actor thread"); + + Self { + cmd_tx, + join_handle: Some(join_handle), + shared_lsn_feedback, + } + } + + /// Start the replication stream. + pub async fn start(&self, start_lsn: Option) -> Result<()> { + let (tx, rx) = oneshot::channel(); + self.cmd_tx + .send(ActorCommand::Start { + start_lsn, + reply: tx, + }) + .await + .map_err(|_| CdcError::generic("Replication actor has shut down"))?; + rx.await + .map_err(|_| CdcError::generic("Replication actor dropped reply"))? + } + + /// Read the next replication event (blocks until one is available or cancelled). + pub async fn next_event(&self, cancel: &CancellationToken) -> Result { + let (tx, rx) = oneshot::channel(); + self.cmd_tx + .send(ActorCommand::NextEvent { + cancel: cancel.clone(), + reply: tx, + }) + .await + .map_err(|_| CdcError::generic("Replication actor has shut down"))?; + rx.await + .map_err(|_| CdcError::generic("Replication actor dropped reply"))? + } + + /// Send feedback / standby status update to PostgreSQL. + pub async fn send_feedback(&self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + self.cmd_tx + .send(ActorCommand::SendFeedback { reply: tx }) + .await + .map_err(|_| CdcError::generic("Replication actor has shut down"))?; + rx.await + .map_err(|_| CdcError::generic("Replication actor dropped reply"))? + } + + /// Get the current LSN position. + pub async fn current_lsn(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.cmd_tx + .send(ActorCommand::CurrentLsn { reply: tx }) + .await + .map_err(|_| CdcError::generic("Replication actor has shut down"))?; + rx.await + .map_err(|_| CdcError::generic("Replication actor dropped reply")) + } + + /// Gracefully stop the replication stream and shut down the actor thread. + pub async fn stop(&self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + // If the actor is already gone, that's OK. + let _ = self.cmd_tx.send(ActorCommand::Stop { reply: tx }).await; + rx.await + .map_err(|_| CdcError::generic("Replication actor dropped reply"))? + } + + /// Wait for the actor thread to exit (call after `stop`). + pub fn join(&mut self) { + if let Some(handle) = self.join_handle.take() { + if let Err(e) = handle.join() { + error!("Replication actor thread panicked: {:?}", e); + } + } + } +} + +impl Drop for ReplicationActorHandle { + fn drop(&mut self) { + // Best-effort: try to tell the actor to stop. If the channel is already closed the actor is gone and we don't need to do anything. + let _ = self.cmd_tx.try_send(ActorCommand::Stop { + reply: oneshot::channel().0, + }); + self.join(); + } +} + +// --------------------------------------------------------------------------- +// Actor event loop — runs on the dedicated thread +// --------------------------------------------------------------------------- + +async fn actor_loop( + mut stream: LogicalReplicationStream, + mut cmd_rx: mpsc::Receiver, +) { + info!("Replication actor started on dedicated thread"); + + while let Some(cmd) = cmd_rx.recv().await { + match cmd { + ActorCommand::Start { start_lsn, reply } => { + let result = stream.start(start_lsn).await.map_err(CdcError::from); + let _ = reply.send(result); + } + + ActorCommand::NextEvent { cancel, reply } => { + let result = stream + .next_event_with_retry(&cancel) + .await + .map_err(CdcError::from); + let _ = reply.send(result); + } + + ActorCommand::SendFeedback { reply } => { + let result = stream.send_feedback().await.map_err(CdcError::from); + let _ = reply.send(result); + } + + ActorCommand::CurrentLsn { reply } => { + let _ = reply.send(stream.current_lsn()); + } + + ActorCommand::Stop { reply } => { + info!("Replication actor received stop command"); + let result = stream.stop().await.map_err(CdcError::from); + let _ = reply.send(result); + break; // Exit the actor loop + } + } + } + + debug!("Replication actor loop exited"); +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Verify that `ReplicationActorHandle` is Send + Sync (compile-time check). + fn _assert_send_sync() { + fn assert_send() {} + fn assert_sync() {} + assert_send::(); + assert_sync::(); + } +} diff --git a/pg2any-lib/src/transaction_manager.rs b/pg2any-lib/src/transaction_manager.rs index ebf1b30..ff6e726 100644 --- a/pg2any-lib/src/transaction_manager.rs +++ b/pg2any-lib/src/transaction_manager.rs @@ -44,6 +44,7 @@ use crate::storage::{CompressionIndex, SqlStreamParser, StorageFactory, Transact use crate::types::{ChangeEvent, DestinationType, EventType, Lsn, ReplicaIdentity, RowData}; use async_compression::tokio::bufread::GzipDecoder; use chrono::{DateTime, Utc}; +use pg_walstream::ColumnValue; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -1046,16 +1047,13 @@ impl TransactionManager { /// Generate INSERT SQL command /// - /// Accepts `&RowData` directly. Converts to HashMap internally because - /// column iteration is required (RowData does not yet expose `iter()`). + /// Accepts `&RowData` directly. Uses `iter()` for column iteration. fn generate_insert_sql(&self, schema: &str, table: &str, new_data: &RowData) -> Result { let schema = self.map_schema(Some(schema)); let (columns, values): (Vec, Vec) = new_data - .clone() - .into_hash_map() - .into_iter() - .map(|(k, v)| (k, self.format_value(&v))) + .iter() + .map(|(k, v)| (k.to_string(), self.format_value(v))) .unzip(); let sql = match self.destination_type { @@ -1114,9 +1112,7 @@ impl TransactionManager { ) -> Result { let schema = self.map_schema(Some(schema)); - // Convert to HashMap for SET clause iteration (RowData lacks iter()) - let new_data_map = new_data.clone().into_hash_map(); - let set_clause: Vec = new_data_map + let set_clause: Vec = new_data .iter() .map(|(col, val)| { let formatted_col = match self.destination_type { @@ -1259,13 +1255,11 @@ impl TransactionManager { .collect::>>()? } ReplicaIdentity::Full => { - // Use all columns from old data — must convert to HashMap for iteration + // Use all columns from old data let data = old_data.ok_or_else(|| { CdcError::Generic("FULL replica identity requires old data".to_string()) })?; - let data_map = data.clone().into_hash_map(); - data_map - .iter() + data.iter() .map(|(col, val)| { let formatted_col = match self.destination_type { DestinationType::MySQL => format!("`{col}`"), @@ -1290,27 +1284,27 @@ impl TransactionManager { Ok(conditions.join(" AND ")) } - /// Format a JSON value as SQL literal - fn format_value(&self, value: &serde_json::Value) -> String { + /// Format a `ColumnValue` as a SQL literal + fn format_value(&self, value: &ColumnValue) -> String { match value { - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Bool(b) => { - if *b { - "TRUE".to_string() - } else { - "FALSE".to_string() + ColumnValue::Null => "NULL".to_string(), + ColumnValue::Text(b) => { + match std::str::from_utf8(b) { + Ok(s) => { + // Escape single quotes by doubling them (SQL standard) + let escaped = s.replace('\'', "''"); + format!("'{escaped}'") + } + Err(_) => { + // Non-UTF-8 text: hex-encode as binary literal + let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect(); + format!("X'{hex}'") + } } } - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => { - // Escape single quotes by doubling them (SQL standard) - let escaped = s.replace('\'', "''"); - format!("'{escaped}'") - } - serde_json::Value::Array(_) | serde_json::Value::Object(_) => { - // For complex types, serialize as JSON string - let json_str = value.to_string().replace('\'', "''"); - format!("'{json_str}'") + ColumnValue::Binary(b) => { + let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect(); + format!("X'{hex}'") } } } diff --git a/pg2any-lib/tests/destination_integration_tests.rs b/pg2any-lib/tests/destination_integration_tests.rs index 02baae6..b20749a 100644 --- a/pg2any-lib/tests/destination_integration_tests.rs +++ b/pg2any-lib/tests/destination_integration_tests.rs @@ -3,7 +3,7 @@ use pg2any_lib::{ types::{ChangeEvent, EventType, ReplicaIdentity}, DestinationType, }; -use pg_walstream::{Lsn, RowData}; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; /// Test that destination handlers have consistent interfaces @@ -103,9 +103,9 @@ fn test_unsupported_destination_types() { // Helper functions to create test events fn create_test_event() -> ChangeEvent { let data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("test".to_string())), - ("active", serde_json::Value::Bool(true)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), + ("active", ColumnValue::text("t")), ]); ChangeEvent::insert("public", "test_table", 456, data, Lsn::from(100)) @@ -113,13 +113,13 @@ fn create_test_event() -> ChangeEvent { fn create_update_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("old_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("old_name")), ]); let new_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("new_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), ]); ChangeEvent::update( @@ -136,11 +136,8 @@ fn create_update_event() -> ChangeEvent { fn create_delete_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ( - "name", - serde_json::Value::String("deleted_name".to_string()), - ), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("deleted_name")), ]); ChangeEvent::delete( @@ -156,8 +153,8 @@ fn create_delete_event() -> ChangeEvent { fn create_update_event_without_old_data() -> ChangeEvent { let new_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("new_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), ]); ChangeEvent::update( @@ -198,18 +195,12 @@ fn test_mysql_destination_update_with_old_data() { // Verify that old_data contains key information for WHERE clause assert!(old_data.get("id").is_some()); - assert_eq!( - old_data.get("id").unwrap(), - &serde_json::Value::Number(serde_json::Number::from(1)) - ); + assert_eq!(old_data.get("id").unwrap(), "1"); // Verify that new_data contains updated information assert!(new_data.get("id").is_some()); assert!(new_data.get("name").is_some()); - assert_eq!( - new_data.get("name").unwrap(), - &serde_json::Value::String("new_name".to_string()) - ); + assert_eq!(new_data.get("name").unwrap(), "new_name"); } else { panic!("Expected Update event"); } @@ -254,18 +245,12 @@ fn test_sqlserver_destination_update_with_old_data() { // Verify that old_data contains key information for WHERE clause assert!(old_data.get("id").is_some()); - assert_eq!( - old_data.get("id").unwrap(), - &serde_json::Value::Number(serde_json::Number::from(1)) - ); + assert_eq!(old_data.get("id").unwrap(), "1"); // Verify that new_data contains updated information assert!(new_data.get("id").is_some()); assert!(new_data.get("name").is_some()); - assert_eq!( - new_data.get("name").unwrap(), - &serde_json::Value::String("new_name".to_string()) - ); + assert_eq!(new_data.get("name").unwrap(), "new_name"); } _ => panic!("Expected Update event"), } @@ -281,10 +266,7 @@ fn test_delete_event_uses_old_data() { EventType::Delete { old_data, .. } => { assert!(old_data.get("id").is_some()); assert!(old_data.get("name").is_some()); - assert_eq!( - old_data.get("name").unwrap(), - &serde_json::Value::String("deleted_name".to_string()) - ); + assert_eq!(old_data.get("name").unwrap(), "deleted_name"); } _ => panic!("Expected Delete event"), } diff --git a/pg2any-lib/tests/event_type_refactor_tests.rs b/pg2any-lib/tests/event_type_refactor_tests.rs index 001cec9..b8fbeba 100644 --- a/pg2any-lib/tests/event_type_refactor_tests.rs +++ b/pg2any-lib/tests/event_type_refactor_tests.rs @@ -1,12 +1,14 @@ use chrono::Utc; use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::json; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; #[test] fn test_new_event_type_insert() { - let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), + ]); let event = ChangeEvent::insert("public", "users", 123, data.clone(), Lsn::from(100)); @@ -28,9 +30,15 @@ fn test_new_event_type_insert() { #[test] fn test_new_event_type_update() { - let old_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("old_name"))]); + let old_data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("old_name")), + ]); - let new_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("new_name"))]); + let new_data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), + ]); let event = ChangeEvent::update( "public", @@ -68,7 +76,10 @@ fn test_new_event_type_update() { #[test] fn test_new_event_type_delete() { - let old_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("deleted_name"))]); + let old_data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("deleted_name")), + ]); let event = ChangeEvent::delete( "public", @@ -145,7 +156,10 @@ fn test_new_event_type_truncate() { #[test] fn test_event_serialization() { - let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), + ]); let event = ChangeEvent::insert("public", "users", 123, data, Lsn::from(100)); diff --git a/pg2any-lib/tests/metrics_logical_tests.rs b/pg2any-lib/tests/metrics_logical_tests.rs index fea5c5a..85095d4 100644 --- a/pg2any-lib/tests/metrics_logical_tests.rs +++ b/pg2any-lib/tests/metrics_logical_tests.rs @@ -10,6 +10,7 @@ use pg2any_lib::monitoring::{ ProcessingTimerTrait, }; use pg2any_lib::types::{ChangeEvent, ReplicaIdentity}; +use pg_walstream::ColumnValue; use pg_walstream::Lsn; use pg_walstream::RowData; use std::sync::Arc; @@ -18,8 +19,8 @@ use std::time::Duration; /// Helper function to create test change events fn create_test_insert_event() -> ChangeEvent { let data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("test".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("test")), ]); ChangeEvent::insert("public", "users", 12345, data, Lsn::from(100)) @@ -27,13 +28,13 @@ fn create_test_insert_event() -> ChangeEvent { fn create_test_update_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("old_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("old_name")), ]); let new_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ("name", serde_json::Value::String("new_name".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("new_name")), ]); ChangeEvent::update( @@ -50,11 +51,8 @@ fn create_test_update_event() -> ChangeEvent { fn create_test_delete_event() -> ChangeEvent { let old_data = RowData::from_pairs(vec![ - ("id", serde_json::Value::Number(serde_json::Number::from(1))), - ( - "name", - serde_json::Value::String("deleted_name".to_string()), - ), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("deleted_name")), ]); ChangeEvent::delete( diff --git a/pg2any-lib/tests/mysql_edge_cases_tests.rs b/pg2any-lib/tests/mysql_edge_cases_tests.rs index a9cef7c..920a839 100644 --- a/pg2any-lib/tests/mysql_edge_cases_tests.rs +++ b/pg2any-lib/tests/mysql_edge_cases_tests.rs @@ -1,8 +1,7 @@ /// Integration tests for MySQL destination error handling and edge cases /// These tests verify error conditions and edge cases in the WHERE clause generation use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::Value; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; /// Test scenarios that should result in errors when processed by MySQL destination @@ -13,8 +12,7 @@ mod mysql_error_scenarios { #[test] fn test_missing_key_column_scenario() { // Create event where key column is missing from data - let incomplete_data = - RowData::from_pairs(vec![("name", Value::String("test".to_string()))]); + let incomplete_data = RowData::from_pairs(vec![("name", ColumnValue::text("test"))]); // Missing "id" which is specified as key column let event = ChangeEvent::update( @@ -100,7 +98,7 @@ mod mysql_error_scenarios { #[test] fn test_empty_key_columns_scenario() { // Test scenario with empty key columns (NOTHING replica identity) - let data = RowData::from_pairs(vec![("some_data", Value::String("value".to_string()))]); + let data = RowData::from_pairs(vec![("some_data", ColumnValue::text("value"))]); let event = ChangeEvent::update( "public", @@ -124,7 +122,7 @@ mod mysql_error_scenarios { fn test_partial_key_columns_missing() { // Test composite key where some key columns are missing let incomplete_data = - RowData::from_pairs(vec![("tenant_id", Value::String("tenant_1".to_string()))]); + RowData::from_pairs(vec![("tenant_id", ColumnValue::text("tenant_1"))]); // Missing "user_id" which is part of composite key let event = ChangeEvent::update( @@ -160,13 +158,13 @@ mod data_source_selection_tests { fn test_data_source_priority_order() { // Test that old_data is always preferred over new_data when available let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("value", Value::String("old_value".to_string())), + ("id", ColumnValue::text("1")), + ("value", ColumnValue::text("old_value")), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("value", Value::String("new_value".to_string())), + ("id", ColumnValue::text("1")), + ("value", ColumnValue::text("new_value")), ]); let event = ChangeEvent::update( @@ -193,11 +191,11 @@ mod data_source_selection_tests { // Verify old_data is selected despite new_data being available assert_eq!( data_source.get("value"), - Some(&Value::String("old_value".to_string())) + Some(&ColumnValue::text("old_value")) ); assert_ne!( data_source.get("value"), - Some(&Value::String("new_value".to_string())) + Some(&ColumnValue::text("new_value")) ); } _ => panic!("Expected Update event"), @@ -208,8 +206,8 @@ mod data_source_selection_tests { fn test_fallback_only_when_old_data_none() { // Test that fallback to new_data only happens when old_data is None let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("value", Value::String("fallback_value".to_string())), + ("id", ColumnValue::text("1")), + ("value", ColumnValue::text("fallback_value")), ]); let event = ChangeEvent::update( @@ -235,7 +233,7 @@ mod data_source_selection_tests { // Verify new_data is used when old_data is None assert_eq!( data_source.get("value"), - Some(&Value::String("fallback_value".to_string())) + Some(&ColumnValue::text("fallback_value")) ); } _ => panic!("Expected Update event"), @@ -251,25 +249,16 @@ mod complex_data_tests { #[test] fn test_complex_json_data_in_where_clause() { // Test with complex JSON data structures - let complex_json = Value::Object(serde_json::Map::from_iter([( - "nested".to_string(), - Value::Object(serde_json::Map::from_iter([( - "array".to_string(), - Value::Array(vec![ - Value::Number(serde_json::Number::from(1)), - Value::String("text".to_string()), - ]), - )])), - )])); + let complex_json_str = r#"{"nested":{"array":[1,"text"]}}"#; let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("json_data", complex_json.clone()), + ("id", ColumnValue::text("1")), + ("json_data", ColumnValue::text(complex_json_str)), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("json_data", complex_json), + ("id", ColumnValue::text("1")), + ("json_data", ColumnValue::text(complex_json_str)), ]); let event = ChangeEvent::update( @@ -290,7 +279,7 @@ mod complex_data_tests { assert!(data_source.get("json_data").is_some()); assert!(matches!( data_source.get("json_data"), - Some(Value::Object(_)) + Some(ColumnValue::Text(_)) )); } _ => panic!("Expected Update event"), @@ -301,10 +290,10 @@ mod complex_data_tests { fn test_various_data_types_in_key_columns() { // Test with different PostgreSQL data types in key columns let old_data = RowData::from_pairs(vec![ - ("string_key", Value::String("key_value".to_string())), - ("int_key", Value::Number(serde_json::Number::from(42))), - ("bool_key", Value::Bool(true)), - ("null_key", Value::Null), + ("string_key", ColumnValue::text("key_value")), + ("int_key", ColumnValue::text("42")), + ("bool_key", ColumnValue::text("true")), + ("null_key", ColumnValue::Null), ]); let event = ChangeEvent::update( @@ -330,14 +319,20 @@ mod complex_data_tests { // Verify all data types are preserved assert!(matches!( data_source.get("string_key"), - Some(Value::String(_)) + Some(ColumnValue::Text(_)) + )); + assert!(matches!( + data_source.get("int_key"), + Some(ColumnValue::Text(_)) )); - assert!(matches!(data_source.get("int_key"), Some(Value::Number(_)))); assert!(matches!( data_source.get("bool_key"), - Some(Value::Bool(true)) + Some(ColumnValue::Text(_)) + )); + assert!(matches!( + data_source.get("null_key"), + Some(ColumnValue::Null) )); - assert!(matches!(data_source.get("null_key"), Some(Value::Null))); } _ => panic!("Expected Update event"), } @@ -365,8 +360,7 @@ mod schema_table_tests { ]; for (schema, table) in test_cases { - let data = - RowData::from_pairs(vec![("id", Value::Number(serde_json::Number::from(1)))]); + let data = RowData::from_pairs(vec![("id", ColumnValue::text("1"))]); let event = ChangeEvent::update( schema.to_string(), diff --git a/pg2any-lib/tests/mysql_error_handling_simple_tests.rs b/pg2any-lib/tests/mysql_error_handling_simple_tests.rs index b8009b2..f67f8d1 100644 --- a/pg2any-lib/tests/mysql_error_handling_simple_tests.rs +++ b/pg2any-lib/tests/mysql_error_handling_simple_tests.rs @@ -2,9 +2,9 @@ /// These tests verify the specific error conditions and messages that should be /// generated by the build_where_clause method in MySQL destination use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; +use pg_walstream::ColumnValue; use pg_walstream::Lsn; use pg_walstream::RowData; -use serde_json::Value; use std::sync::Arc; /// Tests for error conditions in WHERE clause generation @@ -19,8 +19,7 @@ mod where_clause_error_tests { // in the MySQL destination's build_where_clause method // Test Case 1: Missing key column in data - let incomplete_data = - RowData::from_pairs(vec![("name", Value::String("test".to_string()))]); + let incomplete_data = RowData::from_pairs(vec![("name", ColumnValue::text("test"))]); // Missing "id" which is the key column let event_missing_key = ChangeEvent::update( @@ -51,7 +50,7 @@ mod where_clause_error_tests { fn test_replica_identity_nothing_error_conditions() { // Test error conditions specific to NOTHING replica identity - let new_data = RowData::from_pairs(vec![("data", Value::String("some data".to_string()))]); + let new_data = RowData::from_pairs(vec![("data", ColumnValue::text("some data"))]); // UPDATE with NOTHING replica identity let update_event = ChangeEvent::update( diff --git a/pg2any-lib/tests/mysql_where_clause_fix_tests.rs b/pg2any-lib/tests/mysql_where_clause_fix_tests.rs index b4eabd2..e57b3a8 100644 --- a/pg2any-lib/tests/mysql_where_clause_fix_tests.rs +++ b/pg2any-lib/tests/mysql_where_clause_fix_tests.rs @@ -2,22 +2,21 @@ /// These tests verify the critical bug fix where UPDATE/DELETE operations /// now correctly use old_data when available, falling back to new_data when needed use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::Value; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; #[test] fn test_update_uses_old_data_for_where_clause() { // This test verifies the core fix: UPDATE should use old_data for WHERE clause let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(42))), - ("version", Value::Number(serde_json::Number::from(1))), + ("id", ColumnValue::text("42")), + ("version", ColumnValue::text("1")), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(42))), - ("version", Value::Number(serde_json::Number::from(2))), // Version incremented - ("name", Value::String("Updated Name".to_string())), + ("id", ColumnValue::text("42")), + ("version", ColumnValue::text("2")), // Version incremented + ("name", ColumnValue::text("Updated Name")), ]); let event = ChangeEvent::update( @@ -43,10 +42,7 @@ fn test_update_uses_old_data_for_where_clause() { }; // Verify old_data is selected and contains replica identity values - assert_eq!( - data_source.get("version"), - Some(&Value::Number(serde_json::Number::from(1))) - ); + assert_eq!(data_source.get("version"), Some(&ColumnValue::text("1"))); assert!(data_source.get("id").is_some()); // WHERE clause should use version=1 (old), not version=2 (new) @@ -60,8 +56,8 @@ fn test_update_uses_old_data_for_where_clause() { fn test_update_fallback_to_new_data_when_old_data_none() { // Test the fallback behavior when old_data is None let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(100))), - ("status", Value::String("active".to_string())), + ("id", ColumnValue::text("100")), + ("status", ColumnValue::text("active")), ]); let event = ChangeEvent::update( @@ -86,13 +82,10 @@ fn test_update_fallback_to_new_data_when_old_data_none() { }; // Verify fallback works - assert_eq!( - data_source.get("id"), - Some(&Value::Number(serde_json::Number::from(100))) - ); + assert_eq!(data_source.get("id"), Some(&ColumnValue::text("100"))); assert_eq!( data_source.get("status"), - Some(&Value::String("active".to_string())) + Some(&ColumnValue::text("active")) ); } _ => panic!("Expected Update event"), @@ -103,8 +96,8 @@ fn test_update_fallback_to_new_data_when_old_data_none() { fn test_delete_always_uses_old_data() { // DELETE operations should always have old_data available let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(999))), - ("email", Value::String("delete@example.com".to_string())), + ("id", ColumnValue::text("999")), + ("email", ColumnValue::text("delete@example.com")), ]); let event = ChangeEvent::delete( @@ -120,13 +113,10 @@ fn test_delete_always_uses_old_data() { // For DELETE, old_data should always be available match &event.event_type { EventType::Delete { old_data, .. } => { - assert_eq!( - old_data.get("id"), - Some(&Value::Number(serde_json::Number::from(999))) - ); + assert_eq!(old_data.get("id"), Some(&ColumnValue::text("999"))); assert_eq!( old_data.get("email"), - Some(&Value::String("delete@example.com".to_string())) + Some(&ColumnValue::text("delete@example.com")) ); } _ => panic!("Expected Delete event"), @@ -137,15 +127,15 @@ fn test_delete_always_uses_old_data() { fn test_composite_key_data_source_selection() { // Test with composite primary key let old_data = RowData::from_pairs(vec![ - ("tenant_id", Value::String("tenant_1".to_string())), - ("user_id", Value::Number(serde_json::Number::from(42))), - ("balance", Value::Number(serde_json::Number::from(100))), + ("tenant_id", ColumnValue::text("tenant_1")), + ("user_id", ColumnValue::text("42")), + ("balance", ColumnValue::text("100")), ]); let new_data = RowData::from_pairs(vec![ - ("tenant_id", Value::String("tenant_1".to_string())), - ("user_id", Value::Number(serde_json::Number::from(42))), - ("balance", Value::Number(serde_json::Number::from(150))), // Updated + ("tenant_id", ColumnValue::text("tenant_1")), + ("user_id", ColumnValue::text("42")), + ("balance", ColumnValue::text("150")), // Updated ]); let event = ChangeEvent::update( @@ -172,12 +162,9 @@ fn test_composite_key_data_source_selection() { // Both key columns should be available in old_data assert_eq!( data_source.get("tenant_id"), - Some(&Value::String("tenant_1".to_string())) - ); - assert_eq!( - data_source.get("user_id"), - Some(&Value::Number(serde_json::Number::from(42))) + Some(&ColumnValue::text("tenant_1")) ); + assert_eq!(data_source.get("user_id"), Some(&ColumnValue::text("42"))); // WHERE clause: WHERE tenant_id='tenant_1' AND user_id=42 } @@ -189,15 +176,15 @@ fn test_composite_key_data_source_selection() { fn test_replica_identity_full_uses_all_columns() { // Test FULL replica identity with all columns in WHERE clause let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("name", Value::String("John".to_string())), - ("age", Value::Number(serde_json::Number::from(25))), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John")), + ("age", ColumnValue::text("25")), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("name", Value::String("Johnny".to_string())), // Changed - ("age", Value::Number(serde_json::Number::from(26))), // Changed + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Johnny")), // Changed + ("age", ColumnValue::text("26")), // Changed ]); let event = ChangeEvent::update( @@ -216,14 +203,8 @@ fn test_replica_identity_full_uses_all_columns() { let data_source = old_data.as_ref().unwrap(); // All columns available in old_data for FULL replica identity - assert_eq!( - data_source.get("name"), - Some(&Value::String("John".to_string())) - ); - assert_eq!( - data_source.get("age"), - Some(&Value::Number(serde_json::Number::from(25))) - ); + assert_eq!(data_source.get("name"), Some(&ColumnValue::text("John"))); + assert_eq!(data_source.get("age"), Some(&ColumnValue::text("25"))); // WHERE: WHERE id=1 AND name='John' AND age=25 (old values) // SET: SET id=1, name='Johnny', age=26 (new values) @@ -234,15 +215,15 @@ fn test_replica_identity_full_uses_all_columns() { #[test] fn test_json_null_values_in_replica_identity() { - // Test handling of JSON null values + // Test handling of null values let old_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("optional_field", Value::Null), + ("id", ColumnValue::text("1")), + ("optional_field", ColumnValue::Null), ]); let new_data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("optional_field", Value::String("now has value".to_string())), + ("id", ColumnValue::text("1")), + ("optional_field", ColumnValue::text("now has value")), ]); let event = ChangeEvent::update( @@ -261,7 +242,7 @@ fn test_json_null_values_in_replica_identity() { let data_source = old_data.as_ref().unwrap(); // Null values should be preserved in old_data - assert_eq!(data_source.get("optional_field"), Some(&Value::Null)); + assert_eq!(data_source.get("optional_field"), Some(&ColumnValue::Null)); // WHERE clause should handle: WHERE id=1 AND optional_field IS NULL } @@ -273,8 +254,8 @@ fn test_json_null_values_in_replica_identity() { fn test_key_columns_availability() { // Test that key columns are correctly accessible let data = RowData::from_pairs(vec![ - ("id", Value::Number(serde_json::Number::from(1))), - ("name", Value::String("Test".to_string())), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Test")), ]); // Test DEFAULT replica identity (primary key only) diff --git a/pg2any-lib/tests/replica_identity_tests.rs b/pg2any-lib/tests/replica_identity_tests.rs index 70b4c62..17d18e0 100644 --- a/pg2any-lib/tests/replica_identity_tests.rs +++ b/pg2any-lib/tests/replica_identity_tests.rs @@ -1,18 +1,17 @@ use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; -use serde_json::Value; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; #[tokio::test] async fn test_replica_identity_default_with_primary_key() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), ]); let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), ]); let event = ChangeEvent::update( @@ -41,15 +40,15 @@ async fn test_replica_identity_default_with_primary_key() { #[tokio::test] async fn test_replica_identity_full() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), - ("email", Value::String("john@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), + ("email", ColumnValue::text("john@example.com")), ]); let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), - ("email", Value::String("jane@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), + ("email", ColumnValue::text("jane@example.com")), ]); let event = ChangeEvent::update( @@ -76,14 +75,14 @@ async fn test_replica_identity_full() { #[tokio::test] async fn test_replica_identity_index() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), - ("email", Value::String("john@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), + ("email", ColumnValue::text("john@example.com")), ]); let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("email", Value::String("jane@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("email", ColumnValue::text("jane@example.com")), ]); let event = ChangeEvent::update( @@ -110,8 +109,8 @@ async fn test_replica_identity_index() { #[tokio::test] async fn test_replica_identity_nothing() { let new_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("John".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("John")), ]); let event = ChangeEvent::update( @@ -136,8 +135,8 @@ async fn test_replica_identity_nothing() { #[tokio::test] async fn test_delete_with_replica_identity_default() { let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), ]); let event = ChangeEvent::delete( @@ -164,9 +163,9 @@ async fn test_delete_with_replica_identity_default() { #[tokio::test] async fn test_delete_with_replica_identity_full() { let old_data = RowData::from_pairs(vec![ - ("id", Value::String("123".to_string())), - ("name", Value::String("Jane".to_string())), - ("email", Value::String("jane@example.com".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("Jane")), + ("email", ColumnValue::text("jane@example.com")), ]); let event = ChangeEvent::delete( diff --git a/pg2any-lib/tests/sqlite_comprehensive_tests.rs b/pg2any-lib/tests/sqlite_comprehensive_tests.rs index 6b9c382..ce450e8 100644 --- a/pg2any-lib/tests/sqlite_comprehensive_tests.rs +++ b/pg2any-lib/tests/sqlite_comprehensive_tests.rs @@ -4,8 +4,7 @@ use pg2any_lib::{ types::{ChangeEvent, DestinationType, ReplicaIdentity}, Transaction, }; -use pg_walstream::{Lsn, RowData}; -use serde_json::json; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -27,9 +26,8 @@ fn event_to_sql(event: &ChangeEvent) -> Option { use pg2any_lib::types::EventType; match &event.event_type { EventType::Insert { table, data, .. } => { - let data_map = data.clone().into_hash_map(); - let cols: Vec<_> = data_map.keys().cloned().collect(); - let vals: Vec<_> = cols.iter().map(|c| format_value(&data_map[c])).collect(); + let cols: Vec = data.iter().map(|(k, _)| k.to_string()).collect(); + let vals: Vec = data.iter().map(|(_, v)| format_column_value(v)).collect(); Some(format!( "INSERT INTO \"{}\" ({}) VALUES ({});", table, @@ -47,19 +45,18 @@ fn event_to_sql(event: &ChangeEvent) -> Option { key_columns, .. } => { - let new_data_map = new_data.clone().into_hash_map(); - let set: Vec<_> = new_data_map + let set: Vec = new_data .iter() - .map(|(k, v)| format!("\"{}\" = {}", k, format_value(v))) + .map(|(k, v)| format!("\"{}\" = {}", k, format_column_value(v))) .collect(); - let cond: Vec<_> = key_columns + let cond: Vec = key_columns .iter() .filter_map(|k| { old_data .as_ref() .and_then(|d| d.get(k.as_ref())) .or_else(|| new_data.get(k.as_ref())) - .map(|v| format!("\"{}\" = {}", k, format_value(v))) + .map(|v| format!("\"{}\" = {}", k, format_column_value(v))) }) .collect(); if cond.is_empty() { @@ -78,12 +75,12 @@ fn event_to_sql(event: &ChangeEvent) -> Option { key_columns, .. } => { - let cond: Vec<_> = key_columns + let cond: Vec = key_columns .iter() .filter_map(|k| { old_data .get(k.as_ref()) - .map(|v| format!("\"{}\" = {}", k, format_value(v))) + .map(|v| format!("\"{}\" = {}", k, format_column_value(v))) }) .collect(); if cond.is_empty() { @@ -99,13 +96,31 @@ fn event_to_sql(event: &ChangeEvent) -> Option { } } -fn format_value(v: &serde_json::Value) -> String { - match v { - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Bool(b) => if *b { "1" } else { "0" }.to_string(), - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => format!("'{}'", s.replace("'", "''")), - _ => format!("'{}'", v.to_string().replace("'", "''")), +fn format_column_value(value: &ColumnValue) -> String { + match value { + ColumnValue::Null => "NULL".to_string(), + ColumnValue::Text(b) => match std::str::from_utf8(b) { + Ok(s) => { + if s.parse::().is_ok() || s.parse::().is_ok() { + s.to_string() + } else if s == "true" { + "1".to_string() + } else if s == "false" { + "0".to_string() + } else { + format!("'{}'", s.replace('\'', "''")) + } + } + Err(_) => "NULL".to_string(), + }, + ColumnValue::Binary(b) => { + format!( + "X'{}'", + b.iter() + .map(|byte| format!("{:02x}", byte)) + .collect::() + ) + } } } @@ -218,9 +233,9 @@ async fn test_sqlite_empty_string_handling() { // Test empty string insertion let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("text_field", json!("")), - ("nullable_field", json!("")), + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text("")), + ("nullable_field", ColumnValue::text("")), ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); @@ -263,11 +278,11 @@ async fn test_sqlite_null_value_handling() { // Test null value insertion let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("text_field", json!("test")), - ("nullable_field", json!(null)), - ("int_field", json!(null)), - ("real_field", json!(null)), + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text("test")), + ("nullable_field", ColumnValue::Null), + ("int_field", ColumnValue::Null), + ("real_field", ColumnValue::Null), ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); @@ -315,14 +330,14 @@ async fn test_sqlite_unicode_and_special_characters() { // Test unicode and special character insertion let data = RowData::from_pairs(vec![ - ("id", json!(1)), + ("id", ColumnValue::text("1")), ( "text_field", - json!("🚀 Hello 世界! Special chars: áéíóú ñüç"), + ColumnValue::text("🚀 Hello 世界! Special chars: áéíóú ñüç"), ), ( "json_field", - json!("{\"emoji\": \"😀\", \"chinese\": \"你好\"}"), + ColumnValue::text("{\"emoji\": \"😀\", \"chinese\": \"你好\"}"), ), ]); @@ -370,17 +385,16 @@ async fn test_sqlite_large_data_handling() { // Create large text data (1MB) let large_text = "A".repeat(1024 * 1024); - let large_json = json!({ - "data": "B".repeat(500000), - "nested": { - "more_data": "C".repeat(500000) - } - }); + let large_json = format!( + "{{\"data\": \"{}\", \"nested\": {{\"more_data\": \"{}\"}}}}", + "B".repeat(500000), + "C".repeat(500000) + ); let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("text_field", json!(large_text)), - ("json_field", large_json), + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text(&large_text)), + ("json_field", ColumnValue::text(&large_json)), ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); @@ -423,16 +437,19 @@ async fn test_sqlite_numeric_precision() { create_comprehensive_test_table(&pool).await.unwrap(); // Test various numeric types and precision - let test_cases = vec![ - (1, json!(9223372036854775807i64)), // i64 max - (2, json!(-9223372036854775808i64)), // i64 min - (3, json!(3.141592653589793)), // High precision float - (4, json!(1.7976931348623157e308)), // Large float - (5, json!(2.2250738585072014e-308)), // Small float + let test_cases: Vec<(i32, ColumnValue)> = vec![ + (1, ColumnValue::text("9223372036854775807")), // i64 max + (2, ColumnValue::text("-9223372036854775808")), // i64 min + (3, ColumnValue::text("3.141592653589793")), // High precision float + (4, ColumnValue::text("1.7976931348623157e308")), // Large float + (5, ColumnValue::text("2.2250738585072014e-308")), // Small float ]; - for (id, value) in test_cases { - let data = RowData::from_pairs(vec![("id", json!(id)), ("real_field", value.clone())]); + for (id, value) in &test_cases { + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text(&id.to_string())), + ("real_field", value.clone()), + ]); let event = ChangeEvent::insert("main", "comprehensive_test", 123, data, Lsn::from(100)); let result = destination @@ -496,8 +513,8 @@ async fn test_sqlite_constraint_violations() { // Insert initial data let data1 = RowData::from_pairs(vec![ - ("id", json!(1)), - ("unique_field", json!("unique_value")), + ("id", ColumnValue::text("1")), + ("unique_field", ColumnValue::text("unique_value")), ]); let event1 = ChangeEvent::insert("main", "comprehensive_test", 123, data1, Lsn::from(100)); @@ -511,8 +528,8 @@ async fn test_sqlite_constraint_violations() { // Try to insert duplicate unique value - should fail let data2 = RowData::from_pairs(vec![ - ("id", json!(2)), - ("unique_field", json!("unique_value")), + ("id", ColumnValue::text("2")), + ("unique_field", ColumnValue::text("unique_value")), ]); let event2 = ChangeEvent::insert("main", "comprehensive_test", 124, data2, Lsn::from(100)); @@ -546,7 +563,10 @@ async fn test_sqlite_missing_key_columns_error() { create_comprehensive_test_table(&pool).await.unwrap(); // Insert initial data - let data = RowData::from_pairs(vec![("id", json!(1)), ("text_field", json!("test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("text_field", ColumnValue::text("test")), + ]); let event = ChangeEvent::insert( "main", @@ -613,12 +633,18 @@ async fn test_sqlite_bulk_operations_performance() { // Bulk INSERT operations for i in 0..batch_size { let data = RowData::from_pairs(vec![ - ("id", json!(i)), - ("name", json!(format!("User {}", i))), - ("age", json!(20 + (i % 50))), - ("email", json!(format!("user{}@example.com", i))), - ("active", json!(true)), - ("salary", json!(50000.0 + (i as f64 * 10.0))), + ("id", ColumnValue::text(&i.to_string())), + ("name", ColumnValue::text(&format!("User {}", i))), + ("age", ColumnValue::text(&(20 + (i % 50)).to_string())), + ( + "email", + ColumnValue::text(&format!("user{}@example.com", i)), + ), + ("active", ColumnValue::text("true")), + ( + "salary", + ColumnValue::text(&format!("{}", 50000.0 + (i as f64 * 10.0))), + ), ]); let event = ChangeEvent::insert("main", "users", 123 + i as u32, data, Lsn::from(100)); @@ -648,15 +674,21 @@ async fn test_sqlite_bulk_operations_performance() { // Bulk UPDATE operations let update_start = Instant::now(); for i in 0..batch_size / 2 { - let old_data = RowData::from_pairs(vec![("id", json!(i))]); + let old_data = RowData::from_pairs(vec![("id", ColumnValue::text(&i.to_string()))]); let new_data = RowData::from_pairs(vec![ - ("id", json!(i)), - ("name", json!(format!("Updated User {}", i))), - ("age", json!(25 + (i % 50))), - ("email", json!(format!("updated_user{}@example.com", i))), - ("active", json!(true)), - ("salary", json!(60000.0 + (i as f64 * 15.0))), + ("id", ColumnValue::text(&i.to_string())), + ("name", ColumnValue::text(&format!("Updated User {}", i))), + ("age", ColumnValue::text(&(25 + (i % 50)).to_string())), + ( + "email", + ColumnValue::text(&format!("updated_user{}@example.com", i)), + ), + ("active", ColumnValue::text("true")), + ( + "salary", + ColumnValue::text(&format!("{}", 60000.0 + (i as f64 * 15.0))), + ), ]); let event = ChangeEvent::update( @@ -728,15 +760,15 @@ async fn test_sqlite_concurrent_operations() { for i in 0..15 { let record_id = session_id * 1000 + i; let data = RowData::from_pairs(vec![ - ("id", json!(record_id)), + ("id", ColumnValue::text(&record_id.to_string())), ( "name", - json!(format!("Session {} Record {}", session_id, i)), + ColumnValue::text(&format!("Session {} Record {}", session_id, i)), ), - ("age", json!(25 + i % 30)), + ("age", ColumnValue::text(&(25 + i % 30).to_string())), ( "email", - json!(format!("session{}record{}@example.com", session_id, i)), + ColumnValue::text(&format!("session{}record{}@example.com", session_id, i)), ), ]); @@ -815,9 +847,9 @@ async fn test_sqlite_transaction_events() { // Insert some data between transaction events let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("Transaction Test")), - ("email", json!("txn@example.com")), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Transaction Test")), + ("email", ColumnValue::text("txn@example.com")), ]); let insert_event = ChangeEvent::insert("main", "users", 105, data, Lsn::from(100)); @@ -901,7 +933,10 @@ async fn test_sqlite_connection_recovery() { .unwrap(); create_users_table(&pool).await.unwrap(); - let values = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("Alice"))]); + let values = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Alice")), + ]); let event = ChangeEvent::insert("public", "users", 1, values, Lsn::from(100)); let transaction = wrap_in_transaction(event); let process_result = destination @@ -918,7 +953,10 @@ async fn test_sqlite_connection_recovery() { assert!(reconnect_result.is_ok()); // Verify connection works again - let values2 = RowData::from_pairs(vec![("id", json!(2)), ("name", json!("Bob"))]); + let values2 = RowData::from_pairs(vec![ + ("id", ColumnValue::text("2")), + ("name", ColumnValue::text("Bob")), + ]); let event2 = ChangeEvent::insert("public", "users", 2, values2, Lsn::from(100)); let transaction2 = wrap_in_transaction(event2); let process_result2 = destination @@ -1007,12 +1045,12 @@ async fn test_sqlite_complete_crud_cycle() { // 1. CREATE (INSERT) let create_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Doe")), - ("age", json!(30)), - ("email", json!("john@example.com")), - ("active", json!(true)), - ("salary", json!(50000.0)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Doe")), + ("age", ColumnValue::text("30")), + ("email", ColumnValue::text("john@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("50000")), ]); let create_event = ChangeEvent::insert("main", "users", 1, create_data.clone(), Lsn::from(100)); @@ -1044,12 +1082,12 @@ async fn test_sqlite_complete_crud_cycle() { // 3. UPDATE let update_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Smith")), - ("age", json!(31)), - ("email", json!("john.smith@example.com")), - ("active", json!(true)), - ("salary", json!(55000.0)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Smith")), + ("age", ColumnValue::text("31")), + ("email", ColumnValue::text("john.smith@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("55000")), ]); let update_event = ChangeEvent::update( @@ -1086,7 +1124,7 @@ async fn test_sqlite_complete_crud_cycle() { assert_eq!(updated_row.get::("salary"), 55000.0); // 4. DELETE - let delete_data = RowData::from_pairs(vec![("id", json!(1))]); + let delete_data = RowData::from_pairs(vec![("id", ColumnValue::text("1"))]); let delete_event = ChangeEvent::delete( "main", @@ -1137,7 +1175,10 @@ async fn test_sqlite_destination_factory_integration() { .unwrap(); create_users_table(&pool).await.unwrap(); - let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("Factory Test"))]); + let data = RowData::from_pairs(vec![ + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("Factory Test")), + ]); let event = ChangeEvent::insert("main", "users", 1, data, Lsn::from(100)); let process_result = destination diff --git a/pg2any-lib/tests/sqlite_destination_tests.rs b/pg2any-lib/tests/sqlite_destination_tests.rs index 41a9bf9..d299883 100644 --- a/pg2any-lib/tests/sqlite_destination_tests.rs +++ b/pg2any-lib/tests/sqlite_destination_tests.rs @@ -4,8 +4,7 @@ use pg2any_lib::{ types::{ChangeEvent, DestinationType, EventType, ReplicaIdentity}, Transaction, }; -use pg_walstream::{Lsn, RowData}; -use serde_json::json; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::fs; @@ -32,12 +31,8 @@ fn transaction_to_sql_commands(tx: &Transaction) -> Vec { fn event_to_sql(event: &ChangeEvent) -> Option { match &event.event_type { EventType::Insert { table, data, .. } => { - let data_map = data.clone().into_hash_map(); - let columns: Vec = data_map.keys().cloned().collect(); - let values: Vec = columns - .iter() - .map(|col| format_value(&data_map[col])) - .collect(); + let columns: Vec = data.iter().map(|(k, _)| k.to_string()).collect(); + let values: Vec = data.iter().map(|(_, v)| format_column_value(v)).collect(); Some(format!( "INSERT INTO \"{}\" ({}) VALUES ({});", table, @@ -57,10 +52,9 @@ fn event_to_sql(event: &ChangeEvent) -> Option { replica_identity, .. } => { - let new_data_map = new_data.clone().into_hash_map(); - let set_clause: Vec = new_data_map + let set_clause: Vec = new_data .iter() - .map(|(col, val)| format!("\"{}\" = {}", col, format_value(val))) + .map(|(col, val)| format!("\"{}\" = {}", col, format_column_value(val))) .collect(); // For REPLICA IDENTITY FULL with empty key_columns, use all old_data columns @@ -68,7 +62,7 @@ fn event_to_sql(event: &ChangeEvent) -> Option { if key_columns.is_empty() && matches!(replica_identity, ReplicaIdentity::Full) { old_data .as_ref() - .map(|d| d.clone().into_hash_map().into_keys().collect()) + .map(|d| d.iter().map(|(k, _)| k.to_string()).collect()) .unwrap_or_default() } else { key_columns.iter().map(|k| k.to_string()).collect() @@ -82,7 +76,7 @@ fn event_to_sql(event: &ChangeEvent) -> Option { .as_ref() .and_then(|d| d.get(col.as_str())) .or_else(|| new_data.get(col.as_str())) - .map(|val| format!("\"{}\" = {}", col, format_value(val))) + .map(|val| format!("\"{}\" = {}", col, format_column_value(val))) }) .collect(); if conditions.is_empty() { @@ -112,7 +106,7 @@ fn event_to_sql(event: &ChangeEvent) -> Option { .filter_map(|col| { old_data .get(col.as_ref()) - .map(|val| format!("\"{}\" = {}", col, format_value(val))) + .map(|val| format!("\"{}\" = {}", col, format_column_value(val))) }) .collect(); if conditions.is_empty() { @@ -143,14 +137,35 @@ fn event_to_sql(event: &ChangeEvent) -> Option { } } -/// Helper function to format a JSON value as SQL literal -fn format_value(value: &serde_json::Value) -> String { +/// Helper function to format a ColumnValue as SQL literal +fn format_column_value(value: &ColumnValue) -> String { match value { - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Bool(b) => if *b { "1" } else { "0" }.to_string(), - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => format!("'{}'", s.replace('\'', "''")), - _ => format!("'{}'", value.to_string().replace('\'', "''")), + ColumnValue::Null => "NULL".to_string(), + ColumnValue::Text(b) => { + match std::str::from_utf8(b) { + Ok(s) => { + // Try to parse as number + if s.parse::().is_ok() || s.parse::().is_ok() { + s.to_string() + } else if s == "true" { + "1".to_string() + } else if s == "false" { + "0".to_string() + } else { + format!("'{}'", s.replace('\'', "''")) + } + } + Err(_) => "NULL".to_string(), + } + } + ColumnValue::Binary(b) => { + format!( + "X'{}'", + b.iter() + .map(|byte| format!("{:02x}", byte)) + .collect::() + ) + } } } @@ -198,24 +213,24 @@ impl Drop for TempDatabase { /// Helper function to create test data fn create_test_data() -> RowData { RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Doe")), - ("age", json!(30)), - ("email", json!("john@example.com")), - ("active", json!(true)), - ("salary", json!(50000.50)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Doe")), + ("age", ColumnValue::text("30")), + ("email", ColumnValue::text("john@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("50000.50")), ]) } /// Helper function to create updated test data fn create_updated_test_data() -> RowData { RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Smith")), - ("age", json!(31)), - ("email", json!("john.smith@example.com")), - ("active", json!(true)), - ("salary", json!(55000.75)), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Smith")), + ("age", ColumnValue::text("31")), + ("email", ColumnValue::text("john.smith@example.com")), + ("active", ColumnValue::text("true")), + ("salary", ColumnValue::text("55000.75")), ]) } @@ -575,15 +590,15 @@ async fn test_sqlite_destination_replica_identity_full() { // Create UPDATE event with REPLICA IDENTITY FULL let old_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Doe")), - ("email", json!("john@example.com")), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Doe")), + ("email", ColumnValue::text("john@example.com")), ]); let new_data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("name", json!("John Smith")), - ("email", json!("john.smith@example.com")), + ("id", ColumnValue::text("1")), + ("name", ColumnValue::text("John Smith")), + ("email", ColumnValue::text("john.smith@example.com")), ]); let event = ChangeEvent::update( @@ -681,13 +696,13 @@ async fn test_sqlite_destination_complex_data_types() { // Create INSERT event with complex data types let data = RowData::from_pairs(vec![ - ("id", json!(1)), - ("json_array", json!([1, 2, 3, "test"])), + ("id", ColumnValue::text("1")), + ("json_array", ColumnValue::text("[1,2,3,\"test\"]")), ( "json_object", - json!({"key": "value", "nested": {"number": 42}}), + ColumnValue::text("{\"key\":\"value\",\"nested\":{\"number\":42}}"), ), - ("null_value", json!(null)), + ("null_value", ColumnValue::Null), ]); let event = ChangeEvent::insert("main", "complex_data", 123, data, Lsn::from(100)); diff --git a/pg2any-lib/tests/where_clause_fix_tests.rs b/pg2any-lib/tests/where_clause_fix_tests.rs index 1f1c150..d3fb2fc 100644 --- a/pg2any-lib/tests/where_clause_fix_tests.rs +++ b/pg2any-lib/tests/where_clause_fix_tests.rs @@ -1,5 +1,5 @@ use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity}; -use pg_walstream::{Lsn, RowData}; +use pg_walstream::{ColumnValue, Lsn, RowData}; use std::sync::Arc; /// Test to demonstrate the fix for UPDATE WHERE clause issue @@ -8,33 +8,15 @@ fn test_update_where_clause_uses_old_data() { // Create test event simulating PostgreSQL logical replication UPDATE // with replica identity containing old key values let old_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(42)), - ), - ( - "email", - serde_json::Value::String("old.email@example.com".to_string()), - ), + ("id", ColumnValue::text("42")), + ("email", ColumnValue::text("old.email@example.com")), ]); let new_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(42)), - ), - ( - "email", - serde_json::Value::String("new.email@example.com".to_string()), - ), - ( - "name", - serde_json::Value::String("Updated Name".to_string()), - ), - ( - "age", - serde_json::Value::Number(serde_json::Number::from(30)), - ), + ("id", ColumnValue::text("42")), + ("email", ColumnValue::text("new.email@example.com")), + ("name", ColumnValue::text("Updated Name")), + ("age", ColumnValue::text("30")), ]); let event = ChangeEvent::update( @@ -58,43 +40,32 @@ fn test_update_where_clause_uses_old_data() { let old_data = old_data.as_ref().unwrap(); assert!(old_data.get("id").is_some()); assert!(old_data.get("email").is_some()); - assert_eq!( - old_data.get("id").unwrap(), - &serde_json::Value::Number(serde_json::Number::from(42)) - ); - assert_eq!( - old_data.get("email").unwrap(), - &serde_json::Value::String("old.email@example.com".to_string()) - ); + assert_eq!(old_data.get("id").unwrap(), "42"); + assert_eq!(old_data.get("email").unwrap(), "old.email@example.com"); // 2. Verify new_data contains the updated values assert!(new_data.get("id").is_some()); assert!(new_data.get("email").is_some()); assert!(new_data.get("name").is_some()); assert!(new_data.get("age").is_some()); - assert_eq!( - new_data.get("email").unwrap(), - &serde_json::Value::String("new.email@example.com".to_string()) - ); + assert_eq!(new_data.get("email").unwrap(), "new.email@example.com"); // 3. Simulate the logic our fixed code should use: // - SET clause should use new_data keys/values // - WHERE clause should use old_data keys/values - let set_map = new_data.clone().into_hash_map(); - let where_map = old_data.clone().into_hash_map(); - let set_keys: Vec<&String> = set_map.keys().collect(); - let where_keys: Vec<&String> = where_map.keys().collect(); + let set_keys: Vec<&str> = new_data.iter().map(|(k, _)| k.as_ref()).collect(); + let where_keys: Vec<&str> = old_data.iter().map(|(k, _)| k.as_ref()).collect(); // Verify SET clause would include all new columns - assert!(set_keys.contains(&&"id".to_string())); - assert!(set_keys.contains(&&"email".to_string())); - assert!(set_keys.contains(&&"name".to_string())); - assert!(set_keys.contains(&&"age".to_string())); + assert!(set_keys.contains(&"id")); + assert!(set_keys.contains(&"email")); + assert!(set_keys.contains(&"name")); + assert!(set_keys.contains(&"age")); // Verify WHERE clause would use replica identity (old_data) - assert!(where_keys.contains(&&"id".to_string())); - assert!(where_keys.contains(&&"email".to_string())); + assert!(where_keys.contains(&"id")); + assert!(where_keys.contains(&"email")); // The generated SQL should conceptually be: // UPDATE `public`.`users` @@ -113,14 +84,8 @@ fn test_update_where_clause_uses_old_data() { #[test] fn test_delete_where_clause_uses_old_data() { let old_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(99)), - ), - ( - "username", - serde_json::Value::String("user_to_delete".to_string()), - ), + ("id", ColumnValue::text("99")), + ("username", ColumnValue::text("user_to_delete")), ]); let event = ChangeEvent::delete( @@ -151,11 +116,8 @@ fn test_delete_where_clause_uses_old_data() { #[test] fn test_update_fallback_when_no_old_data() { let new_data = RowData::from_pairs(vec![ - ( - "id", - serde_json::Value::Number(serde_json::Number::from(123)), - ), - ("name", serde_json::Value::String("New Name".to_string())), + ("id", ColumnValue::text("123")), + ("name", ColumnValue::text("New Name")), ]); let event = ChangeEvent::update( From 40b01eb1d4e54f11bf2500fd3c7a4b500799e43e Mon Sep 17 00:00:00 2001 From: Danielshih Date: Thu, 26 Feb 2026 01:04:59 +0000 Subject: [PATCH 2/2] Upgrade pg_walstream to version 0.5.1 and refactor replication handling --- Cargo.lock | 4 +- Cargo.toml | 2 +- pg2any-lib/src/client.rs | 73 +++------ pg2any-lib/src/lib.rs | 3 - pg2any-lib/src/replication_actor.rs | 237 ---------------------------- 5 files changed, 28 insertions(+), 291 deletions(-) delete mode 100644 pg2any-lib/src/replication_actor.rs diff --git a/Cargo.lock b/Cargo.lock index 1ac2d90..df15bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,9 +1506,9 @@ dependencies = [ [[package]] name = "pg_walstream" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ada0907d94f64d9107377b397f5467d2af4e03a631bb9c25a9e0c33c0c5cdb5" +checksum = "c6227c83c13542bbfa8936e5a9f8f48c94c256cc6304b67decac921c4e645fab" dependencies = [ "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 4c26683..bd1f937 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ thiserror = "2.0.12" lazy_static = "1.5" tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } -pg_walstream = "0.5.0" +pg_walstream = "0.5.1" tiberius = { version = "0.12.3", features = ["tds73", "sql-browser-tokio", "bigdecimal", "rust_decimal", "time", "chrono"] } sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "mysql", "sqlite", "chrono", "uuid"] } flate2 = "1.1.5" diff --git a/pg2any-lib/src/client.rs b/pg2any-lib/src/client.rs index 5caf241..dd280a1 100644 --- a/pg2any-lib/src/client.rs +++ b/pg2any-lib/src/client.rs @@ -3,7 +3,6 @@ use crate::destinations::{DestinationFactory, DestinationHandler}; use crate::error::{CdcError, Result}; use crate::lsn_tracker::{LsnTracker, SharedLsnFeedback}; use crate::monitoring::{MetricsCollector, MetricsCollectorTrait}; -use crate::replication_actor::{ActorCommand, ReplicationActorHandle}; use crate::transaction_manager::{ PendingTransactionFile, TransactionFileMetadata, TransactionManager, }; @@ -12,7 +11,7 @@ use chrono::{DateTime, Utc}; use pg_walstream::{LogicalReplicationStream, ReplicationStreamConfig}; use std::collections::{BinaryHeap, HashMap}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Mutex}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -78,8 +77,8 @@ pub struct CdcClient { lsn_tracker: Arc, /// Transaction file manager for file-based workflow transaction_file_manager: Arc, - /// Actor handle for the replication stream (Send + Sync safe) - replication_actor: ReplicationActorHandle, + /// Replication stream for PostgreSQL connection + replication_stream: Arc>, } impl CdcClient { @@ -128,7 +127,6 @@ impl CdcClient { let stream_config = ReplicationStreamConfig::from(&config); let replication_stream = LogicalReplicationStream::new(&config.source_connection_string, stream_config).await?; - let replication_actor = ReplicationActorHandle::spawn(replication_stream); let client = Self { config, @@ -139,7 +137,7 @@ impl CdcClient { metrics_collector: Arc::new(MetricsCollector::new()), lsn_tracker, transaction_file_manager: Arc::new(manager), - replication_actor, + replication_stream: Arc::new(Mutex::new(replication_stream)), }; Ok((client, start_lsn)) @@ -177,7 +175,11 @@ impl CdcClient { // Start the replication stream { let start_xlog = start_lsn.map(|lsn| lsn.0); - self.replication_actor.start(start_xlog).await?; + self.replication_stream + .lock() + .await + .start(start_xlog) + .await?; } // Start file-based workflow @@ -206,8 +208,11 @@ impl CdcClient { let (producer_shutdown_tx, producer_shutdown_rx) = oneshot::channel::<()>(); info!("Created producer shutdown notification channel"); - // Get shared_lsn_feedback from actor handle - let shared_lsn_feedback = self.replication_actor.shared_lsn_feedback.clone(); + // Get shared_lsn_feedback from stored replication_stream + let shared_lsn_feedback = { + let stream_guard = self.replication_stream.as_ref().lock().await; + stream_guard.shared_lsn_feedback.clone() + }; if let Some(ref mut handler) = self.destination_handler { info!("Processing pending transaction files from previous run (recovery)..."); @@ -230,8 +235,8 @@ impl CdcClient { } } - // Clone the actor command sender for the producer - let actor_cmd_tx = self.replication_actor.cmd_tx.clone(); + // Clone Arc of replication_stream for the producer + let replication_stream = self.replication_stream.clone(); // Start producer (writes to files only) let producer_handle = { @@ -242,7 +247,7 @@ impl CdcClient { let lsn_feedback = shared_lsn_feedback.clone(); tokio::spawn(Self::run_producer( - actor_cmd_tx, + replication_stream, token, start_lsn, metrics, @@ -467,7 +472,7 @@ impl CdcClient { /// 3. Consumer receives None from mpsc (channel closed) and processes remaining queue /// 4. Consumer then exits after draining all pending transactions async fn run_producer( - actor_cmd_tx: mpsc::Sender, + replication_stream: Arc>, cancellation_token: CancellationToken, start_lsn: Lsn, metrics_collector: Arc, @@ -532,27 +537,10 @@ impl CdcClient { } while !cancellation_token.is_cancelled() { - // Get the next event from the replication actor + // Get the next event from the replication stream (lock for the duration of the call) let event_result = { - let (tx, rx) = oneshot::channel(); - if actor_cmd_tx - .send(ActorCommand::NextEvent { - cancel: cancellation_token.clone(), - reply: tx, - }) - .await - .is_err() - { - error!("Replication actor has shut down"); - break; - } - match rx.await { - Ok(r) => r, - Err(_) => { - error!("Replication actor dropped reply"); - break; - } - } + let mut stream = replication_stream.lock().await; + stream.next_event_with_retry(&cancellation_token).await }; match event_result { @@ -1243,25 +1231,14 @@ impl CdcClient { info!("Both producer and consumer completed gracefully"); { info!("Sending final ACK to PostgreSQL before shutdown"); - self.replication_actor + let mut stream = self.replication_stream.as_ref().lock().await; + stream .shared_lsn_feedback .log_state("Final shutdown - LSN state before ACK"); - if let Err(e) = self.replication_actor.send_feedback().await { - warn!("Failed to send final feedback: {}", e); - } - - match self.replication_actor.current_lsn().await { - Ok(lsn) => info!( - "Stopping logical replication stream (last received LSN: {})", - pg_walstream::format_lsn(lsn) - ), - Err(e) => warn!("Could not get current LSN: {}", e), - } - - if let Err(e) = self.replication_actor.stop().await { + if let Err(e) = stream.stop().await { error!("Failed to stop replication stream: {}", e); - return Err(e); + return Err(CdcError::from(e)); } info!("Final ACK sent successfully to PostgreSQL"); diff --git a/pg2any-lib/src/lib.rs b/pg2any-lib/src/lib.rs index 8d8ce54..0ace78d 100644 --- a/pg2any-lib/src/lib.rs +++ b/pg2any-lib/src/lib.rs @@ -66,9 +66,6 @@ pub mod types; pub mod lsn_tracker; -// Actor-based replication stream wrapper (Send + Sync safe) -pub mod replication_actor; - // High-level client interface pub mod client; diff --git a/pg2any-lib/src/replication_actor.rs b/pg2any-lib/src/replication_actor.rs deleted file mode 100644 index a47bec7..0000000 --- a/pg2any-lib/src/replication_actor.rs +++ /dev/null @@ -1,237 +0,0 @@ -//! Replication stream actor — owns `LogicalReplicationStream` on a dedicated -//! single-threaded runtime so the non-`Sync` libpq connection never crosses -//! thread boundaries. -//! -//! # Architecture -//! -//! ```text -//! ┌─────────────────────────┐ mpsc::channel -//! │ ReplicationActorHandle│ ───Command──►┐ -//! │ (Send + Sync) │ │ -//! └─────────────────────────┘ ▼ -//! ┌────────────────────┐ -//! │ Dedicated Thread │ -//! │ (single-threaded │ -//! │ tokio runtime) │ -//! │ │ -//! │ LogicalReplication │ -//! │ Stream (owned) │ -//! └────────────────────┘ -//! │ -//! Events / Results -//! │ -//! ▼ -//! mpsc::channel / oneshot -//! ``` -//! -//! The handle is fully `Send + Sync` and can be used freely from any tokio task. - -use crate::error::{CdcError, Result}; -use pg_walstream::{LogicalReplicationStream, SharedLsnFeedback}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; - -/// Re-export for convenience. -pub use pg_walstream::types::ChangeEvent; - -pub enum ActorCommand { - /// Start the replication stream with an optional starting LSN. - Start { - start_lsn: Option, - reply: oneshot::Sender>, - }, - /// Read the next event (with automatic retry / keepalive handling). - NextEvent { - cancel: CancellationToken, - reply: oneshot::Sender>, - }, - /// Send feedback (standby status update) to PostgreSQL. - SendFeedback { reply: oneshot::Sender> }, - /// Get the current LSN position. - CurrentLsn { reply: oneshot::Sender }, - /// Gracefully stop the replication stream. - Stop { reply: oneshot::Sender> }, -} - -// --------------------------------------------------------------------------- -// Actor handle — the public, Send + Sync interface -// --------------------------------------------------------------------------- - -/// A `Send + Sync` handle to the replication actor. -/// -/// All interaction with the underlying `LogicalReplicationStream` goes through message-passing, so no `Sync` bound is required on the stream itself. -pub struct ReplicationActorHandle { - pub cmd_tx: mpsc::Sender, - /// Thread join handle so the caller can wait for the actor to shut down. - join_handle: Option>, - /// Shared LSN feedback — thread-safe, can be used directly from any task. - pub shared_lsn_feedback: Arc, -} - -impl ReplicationActorHandle { - /// Spawn a new actor that owns the given `LogicalReplicationStream`. - /// - /// It's own single-threaded tokio runtime, so libpq's `*mut PGconn` never needs to be `Send`/`Sync`. - pub fn spawn(stream: LogicalReplicationStream) -> Self { - let shared_lsn_feedback = stream.shared_lsn_feedback.clone(); - // Bounded command channel — back-pressure if the actor is busy. - let (cmd_tx, cmd_rx) = mpsc::channel::(32); - - let join_handle = std::thread::Builder::new() - .name("replication-actor".into()) - .spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create actor runtime"); - - rt.block_on(actor_loop(stream, cmd_rx)); - }) - .expect("Failed to spawn replication actor thread"); - - Self { - cmd_tx, - join_handle: Some(join_handle), - shared_lsn_feedback, - } - } - - /// Start the replication stream. - pub async fn start(&self, start_lsn: Option) -> Result<()> { - let (tx, rx) = oneshot::channel(); - self.cmd_tx - .send(ActorCommand::Start { - start_lsn, - reply: tx, - }) - .await - .map_err(|_| CdcError::generic("Replication actor has shut down"))?; - rx.await - .map_err(|_| CdcError::generic("Replication actor dropped reply"))? - } - - /// Read the next replication event (blocks until one is available or cancelled). - pub async fn next_event(&self, cancel: &CancellationToken) -> Result { - let (tx, rx) = oneshot::channel(); - self.cmd_tx - .send(ActorCommand::NextEvent { - cancel: cancel.clone(), - reply: tx, - }) - .await - .map_err(|_| CdcError::generic("Replication actor has shut down"))?; - rx.await - .map_err(|_| CdcError::generic("Replication actor dropped reply"))? - } - - /// Send feedback / standby status update to PostgreSQL. - pub async fn send_feedback(&self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - self.cmd_tx - .send(ActorCommand::SendFeedback { reply: tx }) - .await - .map_err(|_| CdcError::generic("Replication actor has shut down"))?; - rx.await - .map_err(|_| CdcError::generic("Replication actor dropped reply"))? - } - - /// Get the current LSN position. - pub async fn current_lsn(&self) -> Result { - let (tx, rx) = oneshot::channel(); - self.cmd_tx - .send(ActorCommand::CurrentLsn { reply: tx }) - .await - .map_err(|_| CdcError::generic("Replication actor has shut down"))?; - rx.await - .map_err(|_| CdcError::generic("Replication actor dropped reply")) - } - - /// Gracefully stop the replication stream and shut down the actor thread. - pub async fn stop(&self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - // If the actor is already gone, that's OK. - let _ = self.cmd_tx.send(ActorCommand::Stop { reply: tx }).await; - rx.await - .map_err(|_| CdcError::generic("Replication actor dropped reply"))? - } - - /// Wait for the actor thread to exit (call after `stop`). - pub fn join(&mut self) { - if let Some(handle) = self.join_handle.take() { - if let Err(e) = handle.join() { - error!("Replication actor thread panicked: {:?}", e); - } - } - } -} - -impl Drop for ReplicationActorHandle { - fn drop(&mut self) { - // Best-effort: try to tell the actor to stop. If the channel is already closed the actor is gone and we don't need to do anything. - let _ = self.cmd_tx.try_send(ActorCommand::Stop { - reply: oneshot::channel().0, - }); - self.join(); - } -} - -// --------------------------------------------------------------------------- -// Actor event loop — runs on the dedicated thread -// --------------------------------------------------------------------------- - -async fn actor_loop( - mut stream: LogicalReplicationStream, - mut cmd_rx: mpsc::Receiver, -) { - info!("Replication actor started on dedicated thread"); - - while let Some(cmd) = cmd_rx.recv().await { - match cmd { - ActorCommand::Start { start_lsn, reply } => { - let result = stream.start(start_lsn).await.map_err(CdcError::from); - let _ = reply.send(result); - } - - ActorCommand::NextEvent { cancel, reply } => { - let result = stream - .next_event_with_retry(&cancel) - .await - .map_err(CdcError::from); - let _ = reply.send(result); - } - - ActorCommand::SendFeedback { reply } => { - let result = stream.send_feedback().await.map_err(CdcError::from); - let _ = reply.send(result); - } - - ActorCommand::CurrentLsn { reply } => { - let _ = reply.send(stream.current_lsn()); - } - - ActorCommand::Stop { reply } => { - info!("Replication actor received stop command"); - let result = stream.stop().await.map_err(CdcError::from); - let _ = reply.send(result); - break; // Exit the actor loop - } - } - } - - debug!("Replication actor loop exited"); -} - -#[cfg(test)] -mod tests { - use super::*; - - /// Verify that `ReplicationActorHandle` is Send + Sync (compile-time check). - fn _assert_send_sync() { - fn assert_send() {} - fn assert_sync() {} - assert_send::(); - assert_sync::(); - } -}