Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ jobs:

cargo test --test safe_transaction_consumer -- --ignored --nocapture --test-threads=1

PGPASSWORD=postgres psql -h localhost -U postgres -d test_walstream \
-c "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_type = 'logical';" || true

cargo test --test complex_types -- --ignored --nocapture --test-threads=1

publish:
name: Publish to crates.io
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ categories = ["database", "parsing", "network-programming"]
tokio = { version = "1.49.0", features = ["io-util", "net", "time", "macros"] }
tokio-util = { version = "0.7.18", features = ["compat"] }
serde = { version = "1.0.228", features = ["derive", "rc"] }
serde_json = "1.0.149"
chrono = { version = "0.4.43", features = ["serde"] }
chrono = { version = "0.4.44", features = ["serde"] }
bytes = "1.11.1"
tracing = "0.1.44"
libpq-sys = "0.8"
Expand All @@ -27,8 +26,9 @@ thiserror = "2.0.18"
default = []

[dev-dependencies]
tokio = { version = "1.47.2", features = ["full"] }
tokio = { version = "1.49.0", features = ["full"] }
criterion = { version = "0.8.2", features = ["html_reports"] }
serde_json = "1.0.149"

[[test]]
name = "snapshot_export"
Expand All @@ -42,6 +42,10 @@ path = "integration-tests/rate_limited_streaming.rs"
name = "safe_transaction_consumer"
path = "integration-tests/safe_transaction_consumer.rs"

[[test]]
name = "complex_types"
path = "integration-tests/complex_types.rs"

[[bench]]
name = "rowdata_vs_hashmap"
name = "columnvalue_vs_json"
harness = false
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

### Working with Event Data

Events carry row data as `RowData` — an ordered list of `(Arc<str>, Value)` pairs.
Events carry row data as [`RowData`] — an ordered list of `(Arc<str>, ColumnValue)` pairs.
[`ColumnValue`] is a lightweight enum (`Null | Text(Bytes) | Binary(Bytes)`) that preserves
the raw PostgreSQL wire representation with zero-copy semantics.
Schema, table, and column names are `Arc<str>` (reference-counted, zero-cost cloning):

```rust
use pg_walstream::{EventType, RowData};
use pg_walstream::{EventType, RowData, ColumnValue};

// Pattern match on event types
match &event.event_type {
Expand Down Expand Up @@ -367,7 +369,9 @@ The library supports all PostgreSQL logical replication message types:

- **Zero-Copy**: Uses `bytes::Bytes` for efficient buffer management
- **Arc-shared column metadata**: Column names, schema, and table names use `Arc<str>` — cloning is a single atomic increment instead of a heap allocation per event
- **RowData (ordered Vec)**: Row payloads use `RowData` (a `Vec<(Arc<str>, Value)>`) instead of `HashMap<String, Value>`, eliminating per-event hashing overhead and extra allocations
- **RowData (ordered Vec)**: Row payloads use `RowData` (a `Vec<(Arc<str>, ColumnValue)>`) instead of `HashMap<String, serde_json::Value>`, eliminating per-event hashing overhead and extra allocations
- **ColumnValue (Null | Text | Binary)**: Preserves the raw PostgreSQL wire representation without intermediate JSON parsing or allocation. Each variant holds zero-copy `Bytes`
- **Binary Wire Format**: `ChangeEvent::encode` / `ChangeEvent::decode` provide a compact binary serialization that is significantly faster and smaller than `serde_json`, ideal for inter-process or network transport
- **Atomic Operations**: Thread-safe LSN tracking with minimal overhead
- **Connection Pooling**: Reusable connection with automatic retry
- **Streaming Support**: Handle large transactions without memory issues
Expand Down
288 changes: 288 additions & 0 deletions benches/columnvalue_vs_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
//! Benchmark: JSON serialization (serde_json) vs Binary serialization (ColumnValue encode/decode)
//!
//! Measures ChangeEvent performance across two serialization strategies:
//!
//! 1. **JSON (serde_json)**: `serde_json::to_vec` / `serde_json::from_slice`
//! 2. **Binary (ColumnValue)**: `ChangeEvent::encode` / `ChangeEvent::decode`
//!
//! Benchmark groups:
//! - `construct` — Build event: HashMap<String,Value> vs RowData<Arc<str>,ColumnValue>
//! - `serialize` — Encode event to bytes: serde_json vs binary
//! - `deserialize` — Decode bytes back to event: serde_json vs binary
//! - `round_trip` — Full encode → decode cycle
//! - `pipeline` — Realistic CDC: construct → clone → lookup → serialize
//!
//! Run:
//! cargo bench --bench columnvalue_vs_json

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use pg_walstream::types::{ChangeEvent, ColumnValue, EventType, Lsn, RowData};
use serde_json::{self, Value};
use std::collections::HashMap;
use std::hint::black_box;
use std::sync::Arc;

/// Old-style event: HashMap<String, serde_json::Value> (pre-ColumnValue approach).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct OldChangeEvent {
schema: String,
table: String,
relation_oid: u32,
data: HashMap<String, Value>,
lsn: u64,
}

/// Build an old-style event using HashMap + serde_json::Value.
fn build_old_event(n_columns: usize) -> OldChangeEvent {
let mut data = HashMap::with_capacity(n_columns);
for i in 0..n_columns {
data.insert(format!("column_{i}"), serde_json::json!(i.to_string()));
}
OldChangeEvent {
schema: "public".to_string(),
table: "users".to_string(),
relation_oid: 16384,
data,
lsn: 0x16B374D848,
}
}

/// Build a new-style ChangeEvent using RowData + ColumnValue.
/// `shared_names` simulates pre-cached `Arc<str>` column names from RelationInfo.
fn build_new_event(shared_names: &[Arc<str>]) -> ChangeEvent {
let n = shared_names.len();
let mut row = RowData::with_capacity(n);
for (i, name) in shared_names.iter().enumerate() {
row.push(Arc::clone(name), ColumnValue::text(&i.to_string()));
}
ChangeEvent::insert("public", "users", 16384, row, Lsn::new(0x16B374D848))
}

/// Pre-create shared column names (mirrors what RelationInfo holds in production).
fn shared_column_names(n: usize) -> Vec<Arc<str>> {
(0..n)
.map(|i| Arc::from(format!("column_{i}").as_str()))
.collect()
}

const COLUMN_COUNTS: [usize; 4] = [5, 10, 20, 50];

// ---------------------------------------------------------------------------
// 1. Construction: HashMap+Value vs RowData+ColumnValue
// ---------------------------------------------------------------------------

/// Compare event construction cost.
fn bench_construct(c: &mut Criterion) {
let mut group = c.benchmark_group("construct");

for n_cols in COLUMN_COUNTS {
let names = shared_column_names(n_cols);

group.bench_with_input(
BenchmarkId::new("json_hashmap", n_cols),
&n_cols,
|b, &n| {
b.iter(|| black_box(build_old_event(n)));
},
);

group.bench_with_input(
BenchmarkId::new("binary_columnvalue", n_cols),
&names,
|b, names| {
b.iter(|| black_box(build_new_event(names)));
},
);
}

group.finish();
}

// ---------------------------------------------------------------------------
// 2. Serialize: serde_json::to_vec vs ChangeEvent::encode
// ---------------------------------------------------------------------------

/// Compare serialization: JSON vs binary encoding.
fn bench_serialize(c: &mut Criterion) {
let mut group = c.benchmark_group("serialize");

for n_cols in COLUMN_COUNTS {
let names = shared_column_names(n_cols);
let new_event = build_new_event(&names);

// JSON serialize (new ChangeEvent via serde)
group.bench_with_input(
BenchmarkId::new("json_serde", n_cols),
&new_event,
|b, event| {
b.iter(|| black_box(serde_json::to_vec(event).unwrap()));
},
);

// Binary encode (ChangeEvent::encode)
group.bench_with_input(
BenchmarkId::new("binary_encode", n_cols),
&new_event,
|b, event| {
b.iter(|| {
let mut buf = bytes::BytesMut::with_capacity(256);
event.encode(&mut buf);
black_box(buf);
});
},
);
}

group.finish();
}

// ---------------------------------------------------------------------------
// 3. Deserialize: serde_json::from_slice vs ChangeEvent::decode
// ---------------------------------------------------------------------------

/// Compare deserialization: JSON vs binary decoding.
fn bench_deserialize(c: &mut Criterion) {
let mut group = c.benchmark_group("deserialize");

for n_cols in COLUMN_COUNTS {
let names = shared_column_names(n_cols);
let new_event = build_new_event(&names);

let new_json_bytes = serde_json::to_vec(&new_event).unwrap();
let mut binary_buf = bytes::BytesMut::with_capacity(256);
new_event.encode(&mut binary_buf);
let binary_bytes = binary_buf.freeze();

// JSON deserialize (new ChangeEvent via serde)
group.bench_with_input(
BenchmarkId::new("json_serde", n_cols),
&new_json_bytes,
|b, data| {
b.iter(|| {
black_box(serde_json::from_slice::<ChangeEvent>(data).unwrap());
});
},
);

// Binary decode (ChangeEvent::decode)
group.bench_with_input(
BenchmarkId::new("binary_decode", n_cols),
&binary_bytes,
|b, data| {
b.iter(|| {
black_box(ChangeEvent::decode(data).unwrap());
});
},
);
}

group.finish();
}

// ---------------------------------------------------------------------------
// 4. Round-trip: serialize → deserialize
// ---------------------------------------------------------------------------

/// Compare full encode → decode round-trip.
fn bench_round_trip(c: &mut Criterion) {
let mut group = c.benchmark_group("round_trip");

for n_cols in COLUMN_COUNTS {
let names = shared_column_names(n_cols);
let new_event = build_new_event(&names);

// JSON round-trip (new ChangeEvent via serde)
group.bench_with_input(
BenchmarkId::new("json_serde", n_cols),
&new_event,
|b, event| {
b.iter(|| {
let json = serde_json::to_vec(event).unwrap();
let decoded: ChangeEvent = serde_json::from_slice(&json).unwrap();
black_box(decoded);
});
},
);

// Binary round-trip (encode → decode)
group.bench_with_input(
BenchmarkId::new("binary_encode_decode", n_cols),
&new_event,
|b, event| {
b.iter(|| {
let mut buf = bytes::BytesMut::with_capacity(256);
event.encode(&mut buf);
let decoded = ChangeEvent::decode(&buf).unwrap();
black_box(decoded);
});
},
);
}

group.finish();
}

// ---------------------------------------------------------------------------
// 5. Realistic CDC pipeline: construct → clone → lookup → serialize
// ---------------------------------------------------------------------------

/// End-to-end CDC simulation: construct event, clone it, look up 3 columns,
/// then serialize to the target format.
fn bench_pipeline(c: &mut Criterion) {
let mut group = c.benchmark_group("pipeline");

for n_cols in COLUMN_COUNTS {
let names = shared_column_names(n_cols);

// New path: RowData + ColumnValue → JSON serde
group.bench_with_input(
BenchmarkId::new("json_serde", n_cols),
&names,
|b, names| {
b.iter(|| {
let event = build_new_event(names);
let cloned = event.clone();
if let EventType::Insert { ref data, .. } = cloned.event_type {
let _ = black_box(data.get("column_0"));
let _ = black_box(data.get("column_1"));
let _ = black_box(data.get("column_2"));
}
let out = serde_json::to_vec(&cloned).unwrap();
black_box(out);
});
},
);

// New path: RowData + ColumnValue → binary encode
group.bench_with_input(
BenchmarkId::new("binary_encode", n_cols),
&names,
|b, names| {
b.iter(|| {
let event = build_new_event(names);
let cloned = event.clone();
if let EventType::Insert { ref data, .. } = cloned.event_type {
let _ = black_box(data.get("column_0"));
let _ = black_box(data.get("column_1"));
let _ = black_box(data.get("column_2"));
}
let mut buf = bytes::BytesMut::with_capacity(256);
cloned.encode(&mut buf);
black_box(buf);
});
},
);
}

group.finish();
}

criterion_group!(
benches,
bench_construct,
bench_serialize,
bench_deserialize,
bench_round_trip,
bench_pipeline,
);
criterion_main!(benches);
Loading