Skip to content

feat: Add redis streams exporter, with cluster, tls support#314

Open
brightsparc wants to merge 9 commits intostreamfold:mainfrom
brightsparc:julian/redis-streams
Open

feat: Add redis streams exporter, with cluster, tls support#314
brightsparc wants to merge 9 commits intostreamfold:mainfrom
brightsparc:julian/redis-streams

Conversation

@brightsparc
Copy link
Contributor

@brightsparc brightsparc commented Mar 3, 2026

Summary

Adds a new Redis Stream exporter that writes OTLP trace data to Redis Streams via XADD. This enables downstream consumers to process span data in real-time using Redis consumer groups. The exporter supports attribute-based stream key routing, both flat and JSON serialization modes, and works with standalone Redis, clustered Redis, GCP Managed Redis (via custom CA), and Valkey.

Motivation

Redis Streams provide a lightweight, high-throughput append-only log with built-in consumer group semantics. For teams already running Redis, this exporter offers a simpler alternative to Kafka for real-time span processing pipelines — no additional infrastructure required.

Key use case: route spans to per-service streams using attribute-based key templates (e.g., traces:{service.name}), allowing independent consumer groups per service.

Usage

# Basic usage
rotel start \
  --exporter redis-stream \
  --redis-stream-exporter-endpoint redis://localhost:6379

# With attribute-based stream routing and flat format
rotel start \
  --exporter redis-stream \
  --redis-stream-exporter-endpoint rediss://redis.example.com:6380 \
  --redis-stream-exporter-stream-key-template "traces:{service.name}" \
  --redis-stream-exporter-format flat \
  --redis-stream-exporter-maxlen 100000

# Cluster mode with auth
rotel start \
  --exporter redis-stream \
  --redis-stream-exporter-endpoint redis://node1:6379,redis://node2:6379 \
  --redis-stream-exporter-cluster-mode true \
  --redis-stream-exporter-username myuser \
  --redis-stream-exporter-password mypass

Or via environment variables:

export ROTEL_EXPORTER=redis-stream
export ROTEL_REDIS_STREAM_EXPORTER_ENDPOINT=redis://localhost:6379
export ROTEL_REDIS_STREAM_EXPORTER_STREAM_KEY_TEMPLATE="traces:{service.name}"
export ROTEL_REDIS_STREAM_EXPORTER_FORMAT=json

Configuration

Flag Env Var Default Description
--redis-stream-exporter-endpoint ROTEL_REDIS_STREAM_EXPORTER_ENDPOINT redis://localhost:6379 Redis endpoint URL (redis:// or rediss:// for TLS)
--redis-stream-exporter-stream-key-template ROTEL_REDIS_STREAM_EXPORTER_STREAM_KEY_TEMPLATE rotel:traces Stream key with optional {attribute} placeholders
--redis-stream-exporter-format ROTEL_REDIS_STREAM_EXPORTER_FORMAT json Serialization format: json or flat
--redis-stream-exporter-maxlen ROTEL_REDIS_STREAM_EXPORTER_MAXLEN None (unlimited) Approximate max stream length (MAXLEN ~N)
--redis-stream-exporter-cluster-mode ROTEL_REDIS_STREAM_EXPORTER_CLUSTER_MODE false Enable Redis Cluster mode
--redis-stream-exporter-ca-cert-path ROTEL_REDIS_STREAM_EXPORTER_CA_CERT_PATH None Custom CA cert for TLS
--redis-stream-exporter-username ROTEL_REDIS_STREAM_EXPORTER_USERNAME None Redis username
--redis-stream-exporter-password ROTEL_REDIS_STREAM_EXPORTER_PASSWORD None Redis password
--redis-stream-exporter-pipeline-size ROTEL_REDIS_STREAM_EXPORTER_PIPELINE_SIZE None (all at once) Max XADD commands per pipeline batch
--redis-stream-exporter-filter-service-names ROTEL_REDIS_STREAM_EXPORTER_FILTER_SERVICE_NAMES None (all services) Comma-separated list of service names to export
--redis-stream-exporter-key-ttl-seconds ROTEL_REDIS_STREAM_EXPORTER_KEY_TTL_SECONDS None (no expiry) TTL in seconds for stream keys

Architecture

Service Name Filtering

When --redis-stream-exporter-filter-service-names is set, only ResourceSpans whose service.name resource attribute matches one of the specified names are exported. Non-matching spans are silently dropped (metadata is still acknowledged). When empty (default), all spans pass through.

Stream Key Templates

Templates are parsed at config time into Vec<TemplatePart> (Literal/Placeholder). Per-span resolution looks up placeholders in span attributes first, then resource attributes, with missing values resolving to empty string.

"traces:{service.name}:{deployment.environment}"
 → [Literal("traces:"), Placeholder("service.name"), Literal(":"), Placeholder("deployment.environment")]

In Redis Cluster, {...} in keys also serves as a hash tag — spans with the same attribute values route to the same node. This is intentional.

Serialization Formats

JSON mode (default) — Single data field containing the full span as a JSON object:

XADD rotel:traces * data '{"trace_id":"abc...","span_name":"GET /api","resource_attributes":{"service.name":"my-svc"},...}'

Flat mode — Individual field/value pairs for each span attribute:

XADD rotel:traces * trace_id abc... span_id def... service_name my-svc span_name "GET /api" attr.http.method GET resource.service.name my-svc ...

Exporter Run Loop

Follows the established non-HTTP exporter pattern (Blackhole/Kafka):

BoundedReceiver<Vec<Message<ResourceSpans>>>
  → select! { rx.next() | cancel_token.cancelled() }
  → process_batch():
      1. Collect MessageMetadata for ack tracking
      2. For each ResourceSpans → ScopeSpans → Span:
         - Resolve stream key template
         - Transform span to fields
      3. Build Redis Pipeline of XADD commands
      4. Execute pipeline (split into sub-batches if pipeline_size configured)
      5. Ack all metadata on success

Connection Abstraction

RedisConnection enum dispatches to standalone ConnectionManager or cluster ClusterConnection:

  • Connections established lazily inside start() (not during construction)
  • TLS activated via rediss:// URL scheme
  • Valkey works out of the box (redis-rs uses RESP protocol)

Feature Flag

The exporter is behind redis_exporter feature, not in default features:

# Cargo.toml
redis = { version = "0.27", features = ["tokio-comp", "connection-manager", "cluster-async", "tokio-rustls-comp", "streams"], optional = true }

[features]
redis_exporter = ["dep:redis"]

Build with: cargo build --features redis_exporter

Changes

New Files

File Purpose
src/exporters/redis_stream/mod.rs Module root, public re-exports
src/exporters/redis_stream/config.rs RedisStreamExporterConfig with builder pattern
src/exporters/redis_stream/errors.rs RedisStreamExportError enum (thiserror)
src/exporters/redis_stream/exporter.rs Run loop, batch processing, pipeline execution
src/exporters/redis_stream/client.rs RedisConnection enum (standalone/cluster)
src/exporters/redis_stream/transformer.rs OTLP span → flat fields or JSON blob
src/exporters/redis_stream/stream_key.rs Stream key template parsing & resolution
src/init/redis_stream_exporter.rs CLI args with clap/figment

Modified Files

File Changes
Cargo.toml Added redis optional dep + redis_exporter feature
src/exporters/mod.rs Registered redis_stream module
src/init/mod.rs Registered redis_stream_exporter init module
src/init/args.rs Added RedisStream to Exporter enum + field to AgentRun
src/init/config.rs Added RedisStream to ExporterArgs, ExporterConfig, TryIntoConfig, args_from_env_prefix, name(), get_single_exporter_config
src/init/agent.rs Added ExporterConfig::RedisStream match arm in traces pipeline

Test Plan

  • 10 unit tests: stream key parsing/resolution (7), transformer flat/JSON output (2), unclosed brace edge case (1)
  • All 528 tests pass with --features redis_exporter
  • All 518 default-feature tests pass (no regressions)
  • cargo build --features redis_exporter compiles cleanly
  • cargo build (default features) compiles cleanly
  • Manual: rotel start --exporter redis-stream, send OTLP traces, verify via redis-cli XRANGE rotel:traces - +

Scope & Future Work

This PR adds traces only. The connection layer supports:

  • TLS: Via rediss:// URL scheme (automatic through redis-rs tokio-rustls-comp feature)
  • Custom CA certs: Via --redis-stream-exporter-ca-cert-path for environments like GCP Managed Redis
  • Username/password auth: Injected into the Redis connection URL
  • Cluster mode: ClusterClient with TLS and auth support
  • Service name filtering: Via --redis-stream-exporter-filter-service-names

Future work:

  • Logs/metrics exporters: Add build_logs_exporter / build_metrics_exporter with corresponding transformer functions
  • Retry logic: Currently pipeline failures are logged and spans lost; retry with backoff can be layered in
  • Connection pooling: For high-throughput deployments

@rjenkins rjenkins self-requested a review March 3, 2026 22:49
Copy link
Contributor

@rjenkins rjenkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @brightsparc thanks for the PR. I have a few more comments to add, but dropping these into the PR now for early discussion.

}
}

fn span_kind_to_string<'a>(kind: i32) -> &'a str {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we intend to modify the Clickhouse exporter here or are you merging in changes from main?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I would move the common otel functions to a shared library given we need the same functions for the redis export.

prefix: String,
result: &mut Vec<(String, String)>,
) {
use opentelemetry_proto::tonic::common::v1::any_value::Value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above in regards to Clickhouse exporter.

}
}
}
None => break,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want a drain_futures method to handle any remaining messages on shutdown. Checkout out the Kafka, OTLP or any exporter built on the HTTP export code for an example.

match m {
Some(messages) => {
let batch_size = messages.iter().map(|m| m.payload.len()).sum::<usize>();
if let Err(e) = self.process_batch(messages).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to need to implement Nack support here and configurable infinite retry so that when used in conjunction with the Kafka, file, or kmsg receiver to ensure no data loss.

}

// Ack all metadata on success
for metadata in all_metadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we need to move this up to after conn.execute_pipeline? If we successfully send N chunks, but then fail at N+1 chunks we'll exit this function early on .await? and never acknowledge the successfully sent chunks.


let maxlen = self.config.maxlen.map(StreamMaxlen::Approx);

let chunk_size = self.config.pipeline_size.unwrap_or(usize::MAX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should set a sensible default here, otherwise we might hit errors on the Redis side in regards to buffer sizes and memory limits.

rx: BoundedReceiver<Vec<Message<ResourceSpans>>>,
}

pub fn build_traces_exporter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this exporter is only going to support traces we're going to need some guards in config.rs on initialization so a user doesn't manually assign metrics or logs to this exporters in a multi-exporter config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants