Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ thiserror = "2.0.12"
lazy_static = "1.5"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
pg_walstream = "0.4.1"
pg_walstream = "0.5.1"
tiberius = { version = "0.12.3", features = ["tds73", "sql-browser-tokio", "bigdecimal", "rust_decimal", "time", "chrono"] }
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "mysql", "sqlite", "chrono", "uuid"] }
flate2 = "1.1.5"
Expand Down
9 changes: 0 additions & 9 deletions pg2any-lib/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,15 +1236,6 @@ impl CdcClient {
.shared_lsn_feedback
.log_state("Final shutdown - LSN state before ACK");

if let Err(e) = stream.send_feedback() {
warn!("Failed to send final feedback: {}", e);
}

info!(
"Stopping logical replication stream (last received LSN: {})",
pg_walstream::format_lsn(stream.current_lsn())
);

if let Err(e) = stream.stop().await {
error!("Failed to stop replication stream: {}", e);
return Err(CdcError::from(e));
Expand Down
1 change: 0 additions & 1 deletion pg2any-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ pub mod destinations;
pub use pg_walstream::{
// Type aliases and utilities
format_lsn,
format_postgres_timestamp,
// Protocol types
message_types,
parse_lsn,
Expand Down
56 changes: 25 additions & 31 deletions pg2any-lib/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::storage::{CompressionIndex, SqlStreamParser, StorageFactory, Transact
use crate::types::{ChangeEvent, DestinationType, EventType, Lsn, ReplicaIdentity, RowData};
use async_compression::tokio::bufread::GzipDecoder;
use chrono::{DateTime, Utc};
use pg_walstream::ColumnValue;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -1046,16 +1047,13 @@ impl TransactionManager {

/// Generate INSERT SQL command
///
/// Accepts `&RowData` directly. Converts to HashMap internally because
/// column iteration is required (RowData does not yet expose `iter()`).
/// Accepts `&RowData` directly. Uses `iter()` for column iteration.
fn generate_insert_sql(&self, schema: &str, table: &str, new_data: &RowData) -> Result<String> {
let schema = self.map_schema(Some(schema));

let (columns, values): (Vec<String>, Vec<String>) = new_data
.clone()
.into_hash_map()
.into_iter()
.map(|(k, v)| (k, self.format_value(&v)))
.iter()
.map(|(k, v)| (k.to_string(), self.format_value(v)))
.unzip();

let sql = match self.destination_type {
Expand Down Expand Up @@ -1114,9 +1112,7 @@ impl TransactionManager {
) -> Result<String> {
let schema = self.map_schema(Some(schema));

// Convert to HashMap for SET clause iteration (RowData lacks iter())
let new_data_map = new_data.clone().into_hash_map();
let set_clause: Vec<String> = new_data_map
let set_clause: Vec<String> = new_data
.iter()
.map(|(col, val)| {
let formatted_col = match self.destination_type {
Expand Down Expand Up @@ -1259,13 +1255,11 @@ impl TransactionManager {
.collect::<Result<Vec<_>>>()?
}
ReplicaIdentity::Full => {
// Use all columns from old data — must convert to HashMap for iteration
// Use all columns from old data
let data = old_data.ok_or_else(|| {
CdcError::Generic("FULL replica identity requires old data".to_string())
})?;
let data_map = data.clone().into_hash_map();
data_map
.iter()
data.iter()
.map(|(col, val)| {
let formatted_col = match self.destination_type {
DestinationType::MySQL => format!("`{col}`"),
Expand All @@ -1290,27 +1284,27 @@ impl TransactionManager {
Ok(conditions.join(" AND "))
}

/// Format a JSON value as SQL literal
fn format_value(&self, value: &serde_json::Value) -> String {
/// Format a `ColumnValue` as a SQL literal
fn format_value(&self, value: &ColumnValue) -> String {
match value {
serde_json::Value::Null => "NULL".to_string(),
serde_json::Value::Bool(b) => {
if *b {
"TRUE".to_string()
} else {
"FALSE".to_string()
ColumnValue::Null => "NULL".to_string(),
ColumnValue::Text(b) => {
match std::str::from_utf8(b) {
Ok(s) => {
// Escape single quotes by doubling them (SQL standard)
let escaped = s.replace('\'', "''");
format!("'{escaped}'")
}
Err(_) => {
// Non-UTF-8 text: hex-encode as binary literal
let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect();
format!("X'{hex}'")
}
}
}
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => {
// Escape single quotes by doubling them (SQL standard)
let escaped = s.replace('\'', "''");
format!("'{escaped}'")
}
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
// For complex types, serialize as JSON string
let json_str = value.to_string().replace('\'', "''");
format!("'{json_str}'")
ColumnValue::Binary(b) => {
let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect();
format!("X'{hex}'")
}
}
}
Expand Down
52 changes: 17 additions & 35 deletions pg2any-lib/tests/destination_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pg2any_lib::{
types::{ChangeEvent, EventType, ReplicaIdentity},
DestinationType,
};
use pg_walstream::{Lsn, RowData};
use pg_walstream::{ColumnValue, Lsn, RowData};
use std::sync::Arc;

/// Test that destination handlers have consistent interfaces
Expand Down Expand Up @@ -103,23 +103,23 @@ fn test_unsupported_destination_types() {
// Helper functions to create test events
fn create_test_event() -> ChangeEvent {
let data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("test".to_string())),
("active", serde_json::Value::Bool(true)),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("test")),
("active", ColumnValue::text("t")),
]);

ChangeEvent::insert("public", "test_table", 456, data, Lsn::from(100))
}

fn create_update_event() -> ChangeEvent {
let old_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("old_name".to_string())),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("old_name")),
]);

let new_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("new_name".to_string())),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("new_name")),
]);

ChangeEvent::update(
Expand All @@ -136,11 +136,8 @@ fn create_update_event() -> ChangeEvent {

fn create_delete_event() -> ChangeEvent {
let old_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
(
"name",
serde_json::Value::String("deleted_name".to_string()),
),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("deleted_name")),
]);

ChangeEvent::delete(
Expand All @@ -156,8 +153,8 @@ fn create_delete_event() -> ChangeEvent {

fn create_update_event_without_old_data() -> ChangeEvent {
let new_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("new_name".to_string())),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("new_name")),
]);

ChangeEvent::update(
Expand Down Expand Up @@ -198,18 +195,12 @@ fn test_mysql_destination_update_with_old_data() {

// Verify that old_data contains key information for WHERE clause
assert!(old_data.get("id").is_some());
assert_eq!(
old_data.get("id").unwrap(),
&serde_json::Value::Number(serde_json::Number::from(1))
);
assert_eq!(old_data.get("id").unwrap(), "1");

// Verify that new_data contains updated information
assert!(new_data.get("id").is_some());
assert!(new_data.get("name").is_some());
assert_eq!(
new_data.get("name").unwrap(),
&serde_json::Value::String("new_name".to_string())
);
assert_eq!(new_data.get("name").unwrap(), "new_name");
} else {
panic!("Expected Update event");
}
Expand Down Expand Up @@ -254,18 +245,12 @@ fn test_sqlserver_destination_update_with_old_data() {

// Verify that old_data contains key information for WHERE clause
assert!(old_data.get("id").is_some());
assert_eq!(
old_data.get("id").unwrap(),
&serde_json::Value::Number(serde_json::Number::from(1))
);
assert_eq!(old_data.get("id").unwrap(), "1");

// Verify that new_data contains updated information
assert!(new_data.get("id").is_some());
assert!(new_data.get("name").is_some());
assert_eq!(
new_data.get("name").unwrap(),
&serde_json::Value::String("new_name".to_string())
);
assert_eq!(new_data.get("name").unwrap(), "new_name");
}
_ => panic!("Expected Update event"),
}
Expand All @@ -281,10 +266,7 @@ fn test_delete_event_uses_old_data() {
EventType::Delete { old_data, .. } => {
assert!(old_data.get("id").is_some());
assert!(old_data.get("name").is_some());
assert_eq!(
old_data.get("name").unwrap(),
&serde_json::Value::String("deleted_name".to_string())
);
assert_eq!(old_data.get("name").unwrap(), "deleted_name");
}
_ => panic!("Expected Delete event"),
}
Expand Down
28 changes: 21 additions & 7 deletions pg2any-lib/tests/event_type_refactor_tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use chrono::Utc;
use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity};
use pg_walstream::{Lsn, RowData};
use serde_json::json;
use pg_walstream::{ColumnValue, Lsn, RowData};
use std::sync::Arc;

#[test]
fn test_new_event_type_insert() {
let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("test"))]);
let data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("test")),
]);

let event = ChangeEvent::insert("public", "users", 123, data.clone(), Lsn::from(100));

Expand All @@ -28,9 +30,15 @@ fn test_new_event_type_insert() {

#[test]
fn test_new_event_type_update() {
let old_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("old_name"))]);
let old_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("old_name")),
]);

let new_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("new_name"))]);
let new_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("new_name")),
]);

let event = ChangeEvent::update(
"public",
Expand Down Expand Up @@ -68,7 +76,10 @@ fn test_new_event_type_update() {

#[test]
fn test_new_event_type_delete() {
let old_data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("deleted_name"))]);
let old_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("deleted_name")),
]);

let event = ChangeEvent::delete(
"public",
Expand Down Expand Up @@ -145,7 +156,10 @@ fn test_new_event_type_truncate() {

#[test]
fn test_event_serialization() {
let data = RowData::from_pairs(vec![("id", json!(1)), ("name", json!("test"))]);
let data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("test")),
]);

let event = ChangeEvent::insert("public", "users", 123, data, Lsn::from(100));

Expand Down
20 changes: 9 additions & 11 deletions pg2any-lib/tests/metrics_logical_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pg2any_lib::monitoring::{
ProcessingTimerTrait,
};
use pg2any_lib::types::{ChangeEvent, ReplicaIdentity};
use pg_walstream::ColumnValue;
use pg_walstream::Lsn;
use pg_walstream::RowData;
use std::sync::Arc;
Expand All @@ -18,22 +19,22 @@ use std::time::Duration;
/// Helper function to create test change events
fn create_test_insert_event() -> ChangeEvent {
let data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("test".to_string())),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("test")),
]);

ChangeEvent::insert("public", "users", 12345, data, Lsn::from(100))
}

fn create_test_update_event() -> ChangeEvent {
let old_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("old_name".to_string())),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("old_name")),
]);

let new_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
("name", serde_json::Value::String("new_name".to_string())),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("new_name")),
]);

ChangeEvent::update(
Expand All @@ -50,11 +51,8 @@ fn create_test_update_event() -> ChangeEvent {

fn create_test_delete_event() -> ChangeEvent {
let old_data = RowData::from_pairs(vec![
("id", serde_json::Value::Number(serde_json::Number::from(1))),
(
"name",
serde_json::Value::String("deleted_name".to_string()),
),
("id", ColumnValue::text("1")),
("name", ColumnValue::text("deleted_name")),
]);

ChangeEvent::delete(
Expand Down
Loading