feat: Add redis streams exporter, with cluster, tls support#314
feat: Add redis streams exporter, with cluster, tls support#314brightsparc wants to merge 9 commits intostreamfold:mainfrom
Conversation
rjenkins
left a comment
There was a problem hiding this comment.
Hi @brightsparc thanks for the PR. I have a few more comments to add, but dropping these into the PR now for early discussion.
| } | ||
| } | ||
|
|
||
| fn span_kind_to_string<'a>(kind: i32) -> &'a str { |
There was a problem hiding this comment.
Did we intend to modify the Clickhouse exporter here or are you merging in changes from main?
There was a problem hiding this comment.
I thought I would move the common otel functions to a shared library given we need the same functions for the redis export.
| prefix: String, | ||
| result: &mut Vec<(String, String)>, | ||
| ) { | ||
| use opentelemetry_proto::tonic::common::v1::any_value::Value; |
There was a problem hiding this comment.
Same question as above in regards to Clickhouse exporter.
| } | ||
| } | ||
| } | ||
| None => break, |
There was a problem hiding this comment.
I think we'll want a drain_futures method to handle any remaining messages on shutdown. Checkout out the Kafka, OTLP or any exporter built on the HTTP export code for an example.
| match m { | ||
| Some(messages) => { | ||
| let batch_size = messages.iter().map(|m| m.payload.len()).sum::<usize>(); | ||
| if let Err(e) = self.process_batch(messages).await { |
There was a problem hiding this comment.
We're going to need to implement Nack support here and configurable infinite retry so that when used in conjunction with the Kafka, file, or kmsg receiver to ensure no data loss.
| } | ||
|
|
||
| // Ack all metadata on success | ||
| for metadata in all_metadata { |
There was a problem hiding this comment.
Do you think we need to move this up to after conn.execute_pipeline? If we successfully send N chunks, but then fail at N+1 chunks we'll exit this function early on .await? and never acknowledge the successfully sent chunks.
|
|
||
| let maxlen = self.config.maxlen.map(StreamMaxlen::Approx); | ||
|
|
||
| let chunk_size = self.config.pipeline_size.unwrap_or(usize::MAX); |
There was a problem hiding this comment.
I think we should set a sensible default here, otherwise we might hit errors on the Redis side in regards to buffer sizes and memory limits.
| rx: BoundedReceiver<Vec<Message<ResourceSpans>>>, | ||
| } | ||
|
|
||
| pub fn build_traces_exporter( |
There was a problem hiding this comment.
If this exporter is only going to support traces we're going to need some guards in config.rs on initialization so a user doesn't manually assign metrics or logs to this exporters in a multi-exporter config.
Summary
Adds a new Redis Stream exporter that writes OTLP trace data to Redis Streams via
XADD. This enables downstream consumers to process span data in real-time using Redis consumer groups. The exporter supports attribute-based stream key routing, both flat and JSON serialization modes, and works with standalone Redis, clustered Redis, GCP Managed Redis (via custom CA), and Valkey.Motivation
Redis Streams provide a lightweight, high-throughput append-only log with built-in consumer group semantics. For teams already running Redis, this exporter offers a simpler alternative to Kafka for real-time span processing pipelines — no additional infrastructure required.
Key use case: route spans to per-service streams using attribute-based key templates (e.g.,
traces:{service.name}), allowing independent consumer groups per service.Usage
Or via environment variables:
Configuration
--redis-stream-exporter-endpointROTEL_REDIS_STREAM_EXPORTER_ENDPOINTredis://localhost:6379redis://orrediss://for TLS)--redis-stream-exporter-stream-key-templateROTEL_REDIS_STREAM_EXPORTER_STREAM_KEY_TEMPLATErotel:traces{attribute}placeholders--redis-stream-exporter-formatROTEL_REDIS_STREAM_EXPORTER_FORMATjsonjsonorflat--redis-stream-exporter-maxlenROTEL_REDIS_STREAM_EXPORTER_MAXLENMAXLEN ~N)--redis-stream-exporter-cluster-modeROTEL_REDIS_STREAM_EXPORTER_CLUSTER_MODEfalse--redis-stream-exporter-ca-cert-pathROTEL_REDIS_STREAM_EXPORTER_CA_CERT_PATH--redis-stream-exporter-usernameROTEL_REDIS_STREAM_EXPORTER_USERNAME--redis-stream-exporter-passwordROTEL_REDIS_STREAM_EXPORTER_PASSWORD--redis-stream-exporter-pipeline-sizeROTEL_REDIS_STREAM_EXPORTER_PIPELINE_SIZE--redis-stream-exporter-filter-service-namesROTEL_REDIS_STREAM_EXPORTER_FILTER_SERVICE_NAMES--redis-stream-exporter-key-ttl-secondsROTEL_REDIS_STREAM_EXPORTER_KEY_TTL_SECONDSArchitecture
Service Name Filtering
When
--redis-stream-exporter-filter-service-namesis set, onlyResourceSpanswhoseservice.nameresource attribute matches one of the specified names are exported. Non-matching spans are silently dropped (metadata is still acknowledged). When empty (default), all spans pass through.Stream Key Templates
Templates are parsed at config time into
Vec<TemplatePart>(Literal/Placeholder). Per-span resolution looks up placeholders in span attributes first, then resource attributes, with missing values resolving to empty string.In Redis Cluster,
{...}in keys also serves as a hash tag — spans with the same attribute values route to the same node. This is intentional.Serialization Formats
JSON mode (default) — Single
datafield containing the full span as a JSON object:Flat mode — Individual field/value pairs for each span attribute:
Exporter Run Loop
Follows the established non-HTTP exporter pattern (Blackhole/Kafka):
Connection Abstraction
RedisConnectionenum dispatches to standaloneConnectionManageror clusterClusterConnection:start()(not during construction)rediss://URL schemeFeature Flag
The exporter is behind
redis_exporterfeature, not in default features:Build with:
cargo build --features redis_exporterChanges
New Files
src/exporters/redis_stream/mod.rssrc/exporters/redis_stream/config.rsRedisStreamExporterConfigwith builder patternsrc/exporters/redis_stream/errors.rsRedisStreamExportErrorenum (thiserror)src/exporters/redis_stream/exporter.rssrc/exporters/redis_stream/client.rsRedisConnectionenum (standalone/cluster)src/exporters/redis_stream/transformer.rssrc/exporters/redis_stream/stream_key.rssrc/init/redis_stream_exporter.rsModified Files
Cargo.tomlredisoptional dep +redis_exporterfeaturesrc/exporters/mod.rsredis_streammodulesrc/init/mod.rsredis_stream_exporterinit modulesrc/init/args.rsRedisStreamtoExporterenum + field toAgentRunsrc/init/config.rsRedisStreamtoExporterArgs,ExporterConfig,TryIntoConfig,args_from_env_prefix,name(),get_single_exporter_configsrc/init/agent.rsExporterConfig::RedisStreammatch arm in traces pipelineTest Plan
--features redis_exportercargo build --features redis_exportercompiles cleanlycargo build(default features) compiles cleanlyrotel start --exporter redis-stream, send OTLP traces, verify viaredis-cli XRANGE rotel:traces - +Scope & Future Work
This PR adds traces only. The connection layer supports:
rediss://URL scheme (automatic through redis-rstokio-rustls-compfeature)--redis-stream-exporter-ca-cert-pathfor environments like GCP Managed RedisClusterClientwith TLS and auth support--redis-stream-exporter-filter-service-namesFuture work:
build_logs_exporter/build_metrics_exporterwith corresponding transformer functions