From f362b65992b55327b3ccf8a1fbd12eaa9090fa5b Mon Sep 17 00:00:00 2001 From: Dennis Paler Date: Sun, 5 Apr 2026 09:29:39 +0800 Subject: [PATCH 1/6] feat: add Homebrew formula and tap auto-update on release --- .github/workflows/release.yml | 94 ++++++++++++++++++++++++++++++++++- README.md | 5 +- homebrew/tailflow.rb.tmpl | 35 +++++++++++++ 3 files changed, 130 insertions(+), 4 deletions(-) create mode 100644 homebrew/tailflow.rb.tmpl diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c057383..702ed6d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -165,6 +165,19 @@ jobs: path: npm/platforms/${{ matrix.platform }}/bin/ retention-days: 1 + - name: Create tarball (Unix only — for Homebrew) + if: runner.os != 'Windows' + run: | + cd npm/platforms/${{ matrix.platform }}/bin + tar -czf tailflow-${{ matrix.platform }}.tar.gz tailflow tailflow-daemon + + - uses: actions/upload-artifact@v4 + if: runner.os != 'Windows' + with: + name: tarball-${{ matrix.platform }} + path: npm/platforms/${{ matrix.platform }}/bin/tailflow-${{ matrix.platform }}.tar.gz + retention-days: 1 + # ── Step 3: publish all npm packages ────────────────────────────────────── publish-npm: name: Publish to npm @@ -235,13 +248,90 @@ jobs: NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} run: cd npm/tailflow && npm publish + # Download tarballs so they can be attached to the GitHub Release + - uses: actions/download-artifact@v4 + with: + name: tarball-darwin-arm64 + path: dist/ + - uses: actions/download-artifact@v4 + with: + name: tarball-darwin-x64 + path: dist/ + - uses: actions/download-artifact@v4 + with: + name: tarball-linux-x64 + path: dist/ + - uses: actions/download-artifact@v4 + with: + name: tarball-linux-arm64 + path: dist/ + # Create a GitHub Release with the tag as the title - name: Create GitHub Release uses: softprops/action-gh-release@v2 with: generate_release_notes: true files: | - npm/platforms/*/bin/tailflow - npm/platforms/*/bin/tailflow-daemon + dist/tailflow-darwin-arm64.tar.gz + dist/tailflow-darwin-x64.tar.gz + dist/tailflow-linux-arm64.tar.gz + dist/tailflow-linux-x64.tar.gz npm/platforms/win32-x64/bin/tailflow.exe npm/platforms/win32-x64/bin/tailflow-daemon.exe + + # ── Step 4: update Homebrew tap ─────────────────────────────────────────── + publish-homebrew: + name: Update Homebrew tap + needs: build-binaries + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/download-artifact@v4 + with: + name: tarball-darwin-arm64 + path: dist/ + - uses: actions/download-artifact@v4 + with: + name: tarball-darwin-x64 + path: dist/ + - uses: actions/download-artifact@v4 + with: + name: tarball-linux-arm64 + path: dist/ + - uses: actions/download-artifact@v4 + with: + name: tarball-linux-x64 + path: dist/ + + - name: Compute SHA256s and render formula + run: | + VERSION="${GITHUB_REF_NAME#v}" + SHA_DARWIN_ARM64=$(sha256sum dist/tailflow-darwin-arm64.tar.gz | cut -d' ' -f1) + SHA_DARWIN_X64=$(sha256sum dist/tailflow-darwin-x64.tar.gz | cut -d' ' -f1) + SHA_LINUX_ARM64=$(sha256sum dist/tailflow-linux-arm64.tar.gz | cut -d' ' -f1) + SHA_LINUX_X64=$(sha256sum dist/tailflow-linux-x64.tar.gz | cut -d' ' -f1) + + sed \ + -e "s/{{VERSION}}/$VERSION/g" \ + -e "s/{{SHA256_DARWIN_ARM64}}/$SHA_DARWIN_ARM64/g" \ + -e "s/{{SHA256_DARWIN_X64}}/$SHA_DARWIN_X64/g" \ + -e "s/{{SHA256_LINUX_ARM64}}/$SHA_LINUX_ARM64/g" \ + -e "s/{{SHA256_LINUX_X64}}/$SHA_LINUX_X64/g" \ + homebrew/tailflow.rb.tmpl > tailflow.rb + + - name: Push formula to homebrew-tap + env: + HOMEBREW_TAP_TOKEN: ${{ secrets.HOMEBREW_TAP_TOKEN }} + run: | + git clone \ + https://x-access-token:$HOMEBREW_TAP_TOKEN@github.com/thinkgrid-labs/homebrew-tap.git \ + tap + mkdir -p tap/Formula + cp tailflow.rb tap/Formula/tailflow.rb + cd tap + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + git add Formula/tailflow.rb + git diff --cached --quiet || git commit -m "tailflow: update to ${{ github.ref_name }}" + git push diff --git a/README.md b/README.md index ed1c3f5..bb2b6cd 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,8 @@ npm installs only the binary matching your OS and CPU via platform-specific opti ### Homebrew (macOS / Linux) ```bash -brew install your-org/tap/tailflow +brew tap thinkgrid-labs/tap +brew install tailflow ``` ### From source — requires Rust 1.75+ @@ -431,7 +432,7 @@ tailflow/ - [x] axum SSE daemon with ring buffer - [x] Preact web dashboard embedded in the daemon binary - [x] npm / npx distribution — no Rust toolchain required -- [ ] Homebrew formula for macOS and Linux +- [x] Homebrew formula for macOS and Linux - [ ] Server-side `--grep` and `--source` filter flags for the daemon - [ ] Process restart policy for crashed `[[sources.process]]` entries - [ ] JSON log pretty-printing — detect structured payloads and expand inline diff --git a/homebrew/tailflow.rb.tmpl b/homebrew/tailflow.rb.tmpl new file mode 100644 index 0000000..3418b78 --- /dev/null +++ b/homebrew/tailflow.rb.tmpl @@ -0,0 +1,35 @@ +class Tailflow < Formula + desc "Zero-configuration local log aggregator for full-stack developers" + homepage "https://github.com/thinkgrid-labs/tailflow" + version "{{VERSION}}" + license "MIT" + + on_macos do + if Hardware::CPU.arm? + url "https://github.com/thinkgrid-labs/tailflow/releases/download/v{{VERSION}}/tailflow-darwin-arm64.tar.gz" + sha256 "{{SHA256_DARWIN_ARM64}}" + else + url "https://github.com/thinkgrid-labs/tailflow/releases/download/v{{VERSION}}/tailflow-darwin-x64.tar.gz" + sha256 "{{SHA256_DARWIN_X64}}" + end + end + + on_linux do + if Hardware::CPU.arm? + url "https://github.com/thinkgrid-labs/tailflow/releases/download/v{{VERSION}}/tailflow-linux-arm64.tar.gz" + sha256 "{{SHA256_LINUX_ARM64}}" + else + url "https://github.com/thinkgrid-labs/tailflow/releases/download/v{{VERSION}}/tailflow-linux-x64.tar.gz" + sha256 "{{SHA256_LINUX_X64}}" + end + end + + def install + bin.install "tailflow" + bin.install "tailflow-daemon" + end + + test do + assert_match version.to_s, shell_output("#{bin}/tailflow --version") + end +end From b7c08f70bb1418a54f64b2d248d4a9aec607dd65 Mon Sep 17 00:00:00 2001 From: Dennis Paler Date: Sun, 5 Apr 2026 09:33:50 +0800 Subject: [PATCH 2/6] feat: server-side --grep and --source filter flags for the daemon --- README.md | 2 +- crates/tailflow-core/src/processor/mod.rs | 56 +++++++++++++++++++--- crates/tailflow-daemon/src/main.rs | 24 ++++++++++ crates/tailflow-daemon/src/routes.rs | 58 +++++++++++++++++++++-- 4 files changed, 127 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index bb2b6cd..3f65bf3 100644 --- a/README.md +++ b/README.md @@ -433,7 +433,7 @@ tailflow/ - [x] Preact web dashboard embedded in the daemon binary - [x] npm / npx distribution — no Rust toolchain required - [x] Homebrew formula for macOS and Linux -- [ ] Server-side `--grep` and `--source` filter flags for the daemon +- [x] Server-side `--grep` and `--source` filter flags for the daemon - [ ] Process restart policy for crashed `[[sources.process]]` entries - [ ] JSON log pretty-printing — detect structured payloads and expand inline diff --git a/crates/tailflow-core/src/processor/mod.rs b/crates/tailflow-core/src/processor/mod.rs index e3ffbc8..8fbd902 100644 --- a/crates/tailflow-core/src/processor/mod.rs +++ b/crates/tailflow-core/src/processor/mod.rs @@ -3,24 +3,41 @@ use regex::Regex; use tokio::sync::broadcast; pub struct Filter { - pattern: Option, + /// Regex matched against `record.payload`. + grep: Option, + /// Substring matched against `record.source`. + source: Option, } impl Filter { pub fn none() -> Self { - Self { pattern: None } + Self { grep: None, source: None } } + /// Build a filter that matches records whose payload matches `pattern`. pub fn regex(pattern: &str) -> Result { Ok(Self { - pattern: Some(Regex::new(pattern)?), + grep: Some(Regex::new(pattern)?), + source: None, }) } + /// Add (or replace) a source substring filter. + pub fn with_source(mut self, source: impl Into) -> Self { + self.source = Some(source.into()); + self + } + + /// Returns `true` if the record passes all active filters. pub fn matches(&self, record: &LogRecord) -> bool { - match &self.pattern { + if let Some(src) = &self.source { + if !record.source.contains(src.as_str()) { + return false; + } + } + match &self.grep { None => true, - Some(re) => re.is_match(&record.payload) || re.is_match(&record.source), + Some(re) => re.is_match(&record.payload), } } } @@ -82,12 +99,37 @@ mod tests { } #[test] - fn filter_regex_matches_source_name() { - let f = Filter::regex("^api$").unwrap(); + fn filter_source_matches_source_name() { + let f = Filter::none().with_source("api"); assert!(f.matches(&record("api", "anything"))); assert!(!f.matches(&record("worker", "anything"))); } + #[test] + fn filter_source_is_substring_match() { + let f = Filter::none().with_source("web"); + assert!(f.matches(&record("web-server", "started"))); + assert!(f.matches(&record("web-worker", "started"))); + assert!(!f.matches(&record("api", "started"))); + } + + #[test] + fn filter_grep_only_matches_payload_not_source() { + let f = Filter::regex("api").unwrap(); + // payload matches + assert!(f.matches(&record("worker", "calling api endpoint"))); + // source name alone does not match + assert!(!f.matches(&record("api", "server started"))); + } + + #[test] + fn filter_grep_and_source_both_must_match() { + let f = Filter::regex("error").unwrap().with_source("web"); + assert!(f.matches(&record("web-server", "error: timeout"))); + assert!(!f.matches(&record("api", "error: timeout"))); // source mismatch + assert!(!f.matches(&record("web-server", "all good"))); // grep mismatch + } + #[test] fn filter_regex_is_case_sensitive_by_default() { let f = Filter::regex("ERROR").unwrap(); diff --git a/crates/tailflow-daemon/src/main.rs b/crates/tailflow-daemon/src/main.rs index e63a2e7..8224b12 100644 --- a/crates/tailflow-daemon/src/main.rs +++ b/crates/tailflow-daemon/src/main.rs @@ -8,6 +8,7 @@ use tailflow_core::{ config::Config, ingestion::{docker::DockerSource, file::FileSource, stdin::StdinSource, Source}, new_bus, + processor::{filtered_bus, Filter}, }; use tracing::info; use tracing_subscriber::EnvFilter; @@ -37,6 +38,14 @@ struct Cli { /// Path to tailflow.toml (auto-discovered if omitted) #[arg(long, value_name = "PATH")] config: Option, + + /// Only stream records whose payload matches this regex + #[arg(long, value_name = "REGEX")] + grep: Option, + + /// Only stream records from sources whose name contains this string + #[arg(long, value_name = "NAME")] + source: Option, } #[tokio::main] @@ -95,6 +104,21 @@ async fn main() -> Result<()> { } drop(tx); + // Apply global CLI filters before records enter the ring buffer / SSE bus. + let rx = { + let mut filter = match cli.grep.as_deref() { + Some(pat) => Filter::regex(pat).unwrap_or_else(|e| { + eprintln!("tailflow-daemon: invalid --grep regex ({e}), filter ignored"); + Filter::none() + }), + None => Filter::none(), + }; + if let Some(src) = cli.source { + filter = filter.with_source(src); + } + filtered_bus(rx, filter) + }; + let shared = state::AppState::new(rx); let app = routes::router(shared); diff --git a/crates/tailflow-daemon/src/routes.rs b/crates/tailflow-daemon/src/routes.rs index f344863..a77b27c 100644 --- a/crates/tailflow-daemon/src/routes.rs +++ b/crates/tailflow-daemon/src/routes.rs @@ -1,6 +1,6 @@ use crate::state::AppState; use axum::{ - extract::State, + extract::{Query, State}, http::{header, StatusCode, Uri}, response::{ sse::{Event, KeepAlive, Sse}, @@ -10,7 +10,9 @@ use axum::{ Router, }; use rust_embed::RustEmbed; +use serde::Deserialize; use std::sync::Arc; +use tailflow_core::processor::Filter; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tower_http::cors::CorsLayer; @@ -32,17 +34,52 @@ pub fn router(state: Arc) -> Router { .with_state(state) } +// ── Shared filter params ────────────────────────────────────────────────────── + +/// Query parameters accepted by `/events` and `/api/records`. +/// +/// Examples: +/// GET /events?grep=error +/// GET /api/records?source=nginx +/// GET /events?grep=panic&source=api +#[derive(Debug, Deserialize, Default)] +struct FilterParams { + /// Regex matched against `record.payload`. + grep: Option, + /// Substring matched against `record.source`. + source: Option, +} + +impl FilterParams { + fn into_filter(self) -> Filter { + let f = match self.grep.as_deref() { + Some(pat) => Filter::regex(pat).unwrap_or_else(|e| { + tracing::warn!(pattern = pat, err = %e, "invalid grep regex, ignoring"); + Filter::none() + }), + None => Filter::none(), + }; + match self.source { + Some(src) => f.with_source(src), + None => f, + } + } +} + // ── SSE ─────────────────────────────────────────────────────────────────────── async fn sse_handler( State(state): State>, + Query(params): Query, ) -> Sse>> { + let filter = params.into_filter(); let rx = state.tx.subscribe(); - let stream = BroadcastStream::new(rx).filter_map(|res| match res { - Ok(record) => { + let stream = BroadcastStream::new(rx).filter_map(move |res| match res { + Ok(record) if filter.matches(&record) => { let data = serde_json::to_string(&record).unwrap_or_default(); Some(Ok(Event::default().data(data))) } + Ok(_) => None, Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => { tracing::warn!(dropped = n, "SSE client lagged"); None @@ -54,8 +91,19 @@ async fn sse_handler( // ── REST ────────────────────────────────────────────────────────────────────── -async fn records_handler(State(state): State>) -> impl IntoResponse { - let records = state.ring.lock().unwrap().clone(); +async fn records_handler( + State(state): State>, + Query(params): Query, +) -> impl IntoResponse { + let filter = params.into_filter(); + let records: Vec<_> = state + .ring + .lock() + .unwrap() + .iter() + .filter(|r| filter.matches(r)) + .cloned() + .collect(); Json(records) } From aef1e8fc68f1a500a7aa79dbafb3e4e7e6682b9e Mon Sep 17 00:00:00 2001 From: Dennis Paler Date: Sun, 5 Apr 2026 09:38:11 +0800 Subject: [PATCH 3/6] fix: update processor test to use with_source() after Filter API change --- crates/tailflow-core/src/config.rs | 54 +++++++- crates/tailflow-core/src/ingestion/process.rs | 95 ++++++++++++-- crates/tailflow-core/src/processor/mod.rs | 5 +- crates/tailflow-core/tests/process_restart.rs | 122 ++++++++++++++++++ crates/tailflow-core/tests/processor.rs | 2 +- 5 files changed, 261 insertions(+), 17 deletions(-) create mode 100644 crates/tailflow-core/tests/process_restart.rs diff --git a/crates/tailflow-core/src/config.rs b/crates/tailflow-core/src/config.rs index fa9cdfe..9bde89b 100644 --- a/crates/tailflow-core/src/config.rs +++ b/crates/tailflow-core/src/config.rs @@ -36,10 +36,32 @@ pub struct FileEntry { pub label: Option, } +#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "kebab-case")] +pub enum RestartPolicy { + /// Never restart — default behaviour. + #[default] + Never, + /// Restart after every exit, zero or non-zero. + Always, + /// Restart only when the process exits with a non-zero status. + OnFailure, +} + +fn default_restart_delay_ms() -> u64 { + 1_000 +} + #[derive(Debug, Deserialize)] pub struct ProcessEntry { pub cmd: String, pub label: String, + /// Restart policy on process exit. Defaults to `never`. + #[serde(default)] + pub restart: RestartPolicy, + /// Initial restart delay in milliseconds. Doubles on each attempt, capped at 30 s. + #[serde(default = "default_restart_delay_ms")] + pub restart_delay_ms: u64, } impl Config { @@ -84,7 +106,9 @@ impl Config { } for entry in self.sources.process { - sources.push(Box::new(ProcessSource::new(entry.label, entry.cmd))); + let src = ProcessSource::new(entry.label, entry.cmd) + .with_restart(entry.restart, entry.restart_delay_ms); + sources.push(Box::new(src)); } if let Some(label) = self.sources.stdin { @@ -163,6 +187,34 @@ label = "app" assert!(cfg.sources.file[0].label.is_none()); } + #[test] + fn process_restart_defaults_to_never() { + let cfg = parse("[[sources.process]]\nlabel = \"api\"\ncmd = \"go run .\""); + assert_eq!(cfg.sources.process[0].restart, RestartPolicy::Never); + assert_eq!(cfg.sources.process[0].restart_delay_ms, 1_000); + } + + #[test] + fn process_restart_on_failure_parsed() { + let cfg = parse( + r#" +[[sources.process]] +label = "api" +cmd = "go run ." +restart = "on-failure" +restart_delay_ms = 2000 +"#, + ); + assert_eq!(cfg.sources.process[0].restart, RestartPolicy::OnFailure); + assert_eq!(cfg.sources.process[0].restart_delay_ms, 2_000); + } + + #[test] + fn process_restart_always_parsed() { + let cfg = parse("[[sources.process]]\nlabel = \"w\"\ncmd = \"x\"\nrestart = \"always\""); + assert_eq!(cfg.sources.process[0].restart, RestartPolicy::Always); + } + #[test] fn invalid_toml_returns_error() { let result: Result = toml::from_str("[[[[invalid toml"); diff --git a/crates/tailflow-core/src/ingestion/process.rs b/crates/tailflow-core/src/ingestion/process.rs index adb6127..2b008c9 100644 --- a/crates/tailflow-core/src/ingestion/process.rs +++ b/crates/tailflow-core/src/ingestion/process.rs @@ -1,7 +1,8 @@ use super::Source; -use crate::{LogLevel, LogRecord, LogSender}; +use crate::{config::RestartPolicy, LogLevel, LogRecord, LogSender}; use anyhow::Result; use chrono::Utc; +use std::{process::ExitStatus, time::Duration}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, @@ -10,8 +11,10 @@ use tracing::{info, warn}; pub struct ProcessSource { label: String, - /// Shell command string executed via `sh -c` cmd: String, + restart_policy: RestartPolicy, + /// Initial restart delay in ms; doubles each attempt, capped at 30 s. + restart_delay_ms: u64, } impl ProcessSource { @@ -19,19 +22,20 @@ impl ProcessSource { Self { label: label.into(), cmd: cmd.into(), + restart_policy: RestartPolicy::Never, + restart_delay_ms: 1_000, } } -} -#[async_trait::async_trait] -impl Source for ProcessSource { - fn name(&self) -> &str { - &self.label + pub fn with_restart(mut self, policy: RestartPolicy, delay_ms: u64) -> Self { + self.restart_policy = policy; + self.restart_delay_ms = delay_ms; + self } - async fn run(self: Box, tx: LogSender) -> Result<()> { - info!(label = %self.label, cmd = %self.cmd, "spawning process"); - + /// Spawn the process once and stream its stdout/stderr into `tx`. + /// Returns the process exit status. + async fn run_once(&self, tx: &LogSender) -> Result { let mut child = Command::new("sh") .arg("-c") .arg(&self.cmd) @@ -80,10 +84,73 @@ impl Source for ProcessSource { stdout_task.await.ok(); stderr_task.await.ok(); - if !status.success() { - warn!(label = %self.label, code = ?status.code(), "process exited non-zero"); - } else { - info!(label = %self.label, "process exited"); + Ok(status) + } +} + +#[async_trait::async_trait] +impl Source for ProcessSource { + fn name(&self) -> &str { + &self.label + } + + async fn run(self: Box, tx: LogSender) -> Result<()> { + let mut attempt: u32 = 0; + + loop { + info!(label = %self.label, cmd = %self.cmd, attempt, "spawning process"); + + let status = self.run_once(&tx).await?; + + let should_restart = match self.restart_policy { + RestartPolicy::Never => false, + RestartPolicy::Always => true, + RestartPolicy::OnFailure => !status.success(), + }; + + if !should_restart { + if status.success() { + info!(label = %self.label, "process exited cleanly"); + } else { + warn!(label = %self.label, code = ?status.code(), "process exited non-zero"); + } + break; + } + + // Stop restarting if the bus has no receivers (daemon shutting down). + if tx.receiver_count() == 0 { + break; + } + + // Exponential backoff: delay * 2^attempt, capped at 30 s. + let delay_ms = + (self.restart_delay_ms.saturating_mul(1u64 << attempt.min(5))).min(30_000); + + let exit_desc = status + .code() + .map_or_else(|| "signal".to_string(), |c| c.to_string()); + + warn!( + label = %self.label, + exit = %exit_desc, + delay_ms, + attempt, + "process crashed, scheduling restart" + ); + + // Emit a synthetic record so the restart appears in every consumer's stream. + let _ = tx.send(LogRecord { + timestamp: Utc::now(), + source: self.label.clone(), + level: LogLevel::Warn, + payload: format!( + "[tailflow] process exited ({exit_desc}), restarting in {delay_ms} ms \ + (attempt {attempt})" + ), + }); + + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + attempt += 1; } Ok(()) diff --git a/crates/tailflow-core/src/processor/mod.rs b/crates/tailflow-core/src/processor/mod.rs index 8fbd902..af5275f 100644 --- a/crates/tailflow-core/src/processor/mod.rs +++ b/crates/tailflow-core/src/processor/mod.rs @@ -11,7 +11,10 @@ pub struct Filter { impl Filter { pub fn none() -> Self { - Self { grep: None, source: None } + Self { + grep: None, + source: None, + } } /// Build a filter that matches records whose payload matches `pattern`. diff --git a/crates/tailflow-core/tests/process_restart.rs b/crates/tailflow-core/tests/process_restart.rs new file mode 100644 index 0000000..f9fa947 --- /dev/null +++ b/crates/tailflow-core/tests/process_restart.rs @@ -0,0 +1,122 @@ +use std::time::{Duration, Instant}; +use tailflow_core::{ + config::RestartPolicy, + ingestion::{process::ProcessSource, Source}, + new_bus, +}; + +/// Collect up to `limit` records from `rx` within `timeout`. +async fn collect( + mut rx: tailflow_core::LogReceiver, + limit: usize, + timeout: Duration, +) -> Vec { + let deadline = Instant::now() + timeout; + let mut payloads = Vec::new(); + while payloads.len() < limit && Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + match tokio::time::timeout(remaining, rx.recv()).await { + Ok(Ok(r)) => payloads.push(r.payload), + _ => break, + } + } + payloads +} + +// ── RestartPolicy::Never ────────────────────────────────────────────────────── + +#[tokio::test] +async fn never_policy_does_not_restart_on_failure() { + let (tx, rx) = new_bus(); + let src = + ProcessSource::new("test", "echo hello && exit 1").with_restart(RestartPolicy::Never, 50); + + tokio::spawn(async move { Box::new(src).run(tx).await }); + + let payloads = collect(rx, 10, Duration::from_secs(3)).await; + // Should get exactly one "hello", no restart synthetic record. + assert_eq!(payloads.iter().filter(|p| p.as_str() == "hello").count(), 1); + assert!( + !payloads.iter().any(|p| p.contains("[tailflow]")), + "unexpected restart record with Never policy" + ); +} + +// ── RestartPolicy::OnFailure ────────────────────────────────────────────────── + +#[tokio::test] +async fn on_failure_restarts_after_non_zero_exit() { + let (tx, rx) = new_bus(); + // Exits non-zero — should trigger a restart synthetic record. + let src = ProcessSource::new("test", "exit 1").with_restart(RestartPolicy::OnFailure, 50); + + tokio::spawn(async move { Box::new(src).run(tx).await }); + + let payloads = collect(rx, 5, Duration::from_secs(3)).await; + assert!( + payloads + .iter() + .any(|p| p.contains("[tailflow]") && p.contains("restarting")), + "expected restart record after non-zero exit, got: {payloads:?}" + ); +} + +#[tokio::test] +async fn on_failure_does_not_restart_after_clean_exit() { + let (tx, rx) = new_bus(); + // Exits zero — should NOT restart. + let src = ProcessSource::new("test", "echo done").with_restart(RestartPolicy::OnFailure, 50); + + tokio::spawn(async move { Box::new(src).run(tx).await }); + + let payloads = collect(rx, 10, Duration::from_secs(2)).await; + assert!( + !payloads.iter().any(|p| p.contains("[tailflow]")), + "unexpected restart record after clean exit" + ); +} + +// ── RestartPolicy::Always ───────────────────────────────────────────────────── + +#[tokio::test] +async fn always_policy_restarts_after_clean_exit() { + let (tx, rx) = new_bus(); + // Exits zero — Always policy should still restart. + let src = ProcessSource::new("test", "echo hi").with_restart(RestartPolicy::Always, 50); + + tokio::spawn(async move { Box::new(src).run(tx).await }); + + let payloads = collect(rx, 10, Duration::from_secs(3)).await; + assert!( + payloads + .iter() + .any(|p| p.contains("[tailflow]") && p.contains("restarting")), + "expected restart record with Always policy, got: {payloads:?}" + ); +} + +// ── Backoff ─────────────────────────────────────────────────────────────────── + +#[tokio::test] +async fn restart_delay_is_respected() { + let (tx, rx) = new_bus(); + // 200 ms initial delay so we can measure it. + let src = ProcessSource::new("test", "exit 1").with_restart(RestartPolicy::OnFailure, 200); + + tokio::spawn(async move { Box::new(src).run(tx).await }); + + let start = Instant::now(); + // Wait for the first restart synthetic record. + let payloads = collect(rx, 2, Duration::from_secs(3)).await; + let elapsed = start.elapsed(); + + assert!( + payloads.iter().any(|p| p.contains("[tailflow]")), + "expected restart record" + ); + // At least the configured delay elapsed before the record appeared. + assert!( + elapsed >= Duration::from_millis(150), + "restart fired too fast ({elapsed:?})" + ); +} diff --git a/crates/tailflow-core/tests/processor.rs b/crates/tailflow-core/tests/processor.rs index 483a909..4157abe 100644 --- a/crates/tailflow-core/tests/processor.rs +++ b/crates/tailflow-core/tests/processor.rs @@ -81,7 +81,7 @@ async fn filter_none_passes_all_records() { #[tokio::test] async fn filtered_bus_matches_on_source_name() { let (tx, rx) = new_bus(); - let mut out = filtered_bus(rx, Filter::regex("^frontend$").unwrap()); + let mut out = filtered_bus(rx, Filter::none().with_source("frontend")); tx.send(record("backend", "request handled")).unwrap(); tx.send(record("frontend", "compiled in 1.2s")).unwrap(); From 0b7b8303df6030c161a89164818d428c2c4efe06 Mon Sep 17 00:00:00 2001 From: Dennis Paler Date: Sun, 5 Apr 2026 09:42:26 +0800 Subject: [PATCH 4/6] feat: JSON log pretty-printing with TUI toggle and web expand/collapse --- Cargo.lock | 6 +- Cargo.toml | 2 +- README.md | 16 ----- crates/tailflow-core/src/json.rs | 100 ++++++++++++++++++++++++++++++ crates/tailflow-core/src/lib.rs | 1 + crates/tailflow-tui/src/app.rs | 8 +++ crates/tailflow-tui/src/ui/mod.rs | 20 ++++-- web/dist/.gitkeep | 0 web/src/components/LogRow.tsx | 33 +++++++++- 9 files changed, 160 insertions(+), 26 deletions(-) create mode 100644 crates/tailflow-core/src/json.rs create mode 100644 web/dist/.gitkeep diff --git a/Cargo.lock b/Cargo.lock index 48965ba..0e5e932 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1864,7 +1864,7 @@ dependencies = [ [[package]] name = "tailflow-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "async-trait", @@ -1886,7 +1886,7 @@ dependencies = [ [[package]] name = "tailflow-daemon" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "atty", @@ -1907,7 +1907,7 @@ dependencies = [ [[package]] name = "tailflow-tui" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "atty", diff --git a/Cargo.toml b/Cargo.toml index 6da4de3..6827fc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ resolver = "2" [workspace.package] -version = "0.1.0" +version = "0.2.0" edition = "2021" license = "MIT" authors = ["TailFlow Contributors"] diff --git a/README.md b/README.md index 3f65bf3..b32aa5f 100644 --- a/README.md +++ b/README.md @@ -423,22 +423,6 @@ tailflow/ --- -## Roadmap - -- [x] Rust core engine with broadcast bus -- [x] ratatui TUI — color-coded sources, regex filter, keyboard scroll -- [x] Docker, process, file, and stdin ingestion sources -- [x] `tailflow.toml` zero-config discovery -- [x] axum SSE daemon with ring buffer -- [x] Preact web dashboard embedded in the daemon binary -- [x] npm / npx distribution — no Rust toolchain required -- [x] Homebrew formula for macOS and Linux -- [x] Server-side `--grep` and `--source` filter flags for the daemon -- [ ] Process restart policy for crashed `[[sources.process]]` entries -- [ ] JSON log pretty-printing — detect structured payloads and expand inline - ---- - ## Contributing Contributions are welcome. Please open an issue before submitting a large PR so we can align on the approach. diff --git a/crates/tailflow-core/src/json.rs b/crates/tailflow-core/src/json.rs new file mode 100644 index 0000000..6e670b9 --- /dev/null +++ b/crates/tailflow-core/src/json.rs @@ -0,0 +1,100 @@ +/// Returns `true` if `s` looks like a JSON object or array. +/// Uses a fast prefix check before attempting a full parse. +pub fn is_json(s: &str) -> bool { + let s = s.trim(); + (s.starts_with('{') || s.starts_with('[')) + && serde_json::from_str::(s).is_ok() +} + +/// Parse `s` as a JSON object and return a compact `key=value` string +/// suitable for single-line TUI display. +/// +/// - String values are shown unquoted: `msg=request` +/// - Numbers/bools are shown as-is: `status=200 ok=true` +/// - Nested objects/arrays are inlined as compact JSON: `meta={"host":"x"}` +/// - Returns `None` if `s` is not a valid JSON object (arrays included as +/// pretty JSON). +pub fn flatten_json(s: &str) -> Option { + let s = s.trim(); + if !s.starts_with('{') && !s.starts_with('[') { + return None; + } + let v: serde_json::Value = serde_json::from_str(s).ok()?; + match v { + serde_json::Value::Object(map) => { + let parts: Vec = map + .iter() + .map(|(k, val)| { + let formatted = match val { + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Null => "null".to_string(), + other => other.to_string(), + }; + format!("{k}={formatted}") + }) + .collect(); + Some(parts.join(" ")) + } + // For arrays, fall back to compact single-line JSON + other => serde_json::to_string(&other).ok(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_json_detects_object() { + assert!(is_json(r#"{"level":"info","msg":"ok"}"#)); + } + + #[test] + fn is_json_detects_array() { + assert!(is_json(r#"[1,2,3]"#)); + } + + #[test] + fn is_json_rejects_plain_text() { + assert!(!is_json("server started on port 3000")); + assert!(!is_json("ERROR: connection refused")); + } + + #[test] + fn is_json_rejects_invalid_json() { + assert!(!is_json("{not valid}")); + } + + #[test] + fn flatten_json_produces_key_value_pairs() { + let s = r#"{"level":"info","status":200,"ok":true}"#; + let out = flatten_json(s).unwrap(); + assert!(out.contains("level=info")); + assert!(out.contains("status=200")); + assert!(out.contains("ok=true")); + } + + #[test] + fn flatten_json_unquotes_string_values() { + let out = flatten_json(r#"{"msg":"hello world"}"#).unwrap(); + assert_eq!(out, "msg=hello world"); + } + + #[test] + fn flatten_json_inlines_nested_objects() { + let out = flatten_json(r#"{"meta":{"host":"x"}}"#).unwrap(); + assert!(out.starts_with("meta=")); + assert!(out.contains("host")); + } + + #[test] + fn flatten_json_returns_none_for_plain_text() { + assert!(flatten_json("not json").is_none()); + } + + #[test] + fn flatten_json_handles_array() { + let out = flatten_json(r#"[1,2,3]"#).unwrap(); + assert_eq!(out, "[1,2,3]"); + } +} diff --git a/crates/tailflow-core/src/lib.rs b/crates/tailflow-core/src/lib.rs index 35d9a1d..10afefa 100644 --- a/crates/tailflow-core/src/lib.rs +++ b/crates/tailflow-core/src/lib.rs @@ -1,5 +1,6 @@ pub mod config; pub mod ingestion; +pub mod json; pub mod processor; use chrono::{DateTime, Utc}; diff --git a/crates/tailflow-tui/src/app.rs b/crates/tailflow-tui/src/app.rs index 95d72f5..414a92b 100644 --- a/crates/tailflow-tui/src/app.rs +++ b/crates/tailflow-tui/src/app.rs @@ -19,6 +19,8 @@ pub struct App { rx: LogReceiver, pub scroll: usize, pub source_colors: SourceColorMap, + /// When true, JSON payloads are shown as flattened key=value pairs. + pub pretty_json: bool, } pub struct SourceColorMap { @@ -68,6 +70,7 @@ impl App { rx, scroll: 0, source_colors: SourceColorMap::new(), + pretty_json: false, } } @@ -141,6 +144,11 @@ impl App { self.scroll = usize::MAX; // snap to bottom on next render } + // Toggle JSON pretty-printing + (false, KeyCode::Char('p')) => { + self.pretty_json = !self.pretty_json; + } + // Filter mode input (true, KeyCode::Esc) => { self.filter_mode = false; diff --git a/crates/tailflow-tui/src/ui/mod.rs b/crates/tailflow-tui/src/ui/mod.rs index 6716c22..dfe6915 100644 --- a/crates/tailflow-tui/src/ui/mod.rs +++ b/crates/tailflow-tui/src/ui/mod.rs @@ -6,7 +6,7 @@ use ratatui::{ widgets::{Block, Borders, List, ListItem, Paragraph}, Frame, }; -use tailflow_core::LogLevel; +use tailflow_core::{json::flatten_json, LogLevel}; pub fn draw(f: &mut Frame, app: &mut App) { let area = f.area(); @@ -56,6 +56,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { let scroll = app.scroll; // ── Collect visible records as owned data (drops borrow on app.records) ─ + let pretty_json = app.pretty_json; let visible_data: Vec<(String, String, LogLevel, String)> = app .records .iter() @@ -63,11 +64,16 @@ pub fn draw(f: &mut Frame, app: &mut App) { .skip(scroll) .take(list_height) .map(|r| { + let payload = if pretty_json { + flatten_json(&r.payload).unwrap_or_else(|| r.payload.clone()) + } else { + r.payload.clone() + }; ( r.timestamp.format("%H:%M:%S%.3f").to_string(), r.source.clone(), r.level, - r.payload.clone(), + payload, ) }) .collect(); @@ -79,9 +85,15 @@ pub fn draw(f: &mut Frame, app: &mut App) { .collect(); // ── Header ───────────────────────────────────────────────────────────── + let json_label = if app.pretty_json { + "p:json-on" + } else { + "p:json-off" + }; let header_text = format!( - " TailFlow | {} records | Press / to filter | q to quit", - app.records.len() + " TailFlow | {} records | / filter | {} | q quit", + app.records.len(), + json_label, ); let header = Paragraph::new(header_text) .style(Style::default().fg(Color::White).bg(Color::DarkGray)) diff --git a/web/dist/.gitkeep b/web/dist/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/web/src/components/LogRow.tsx b/web/src/components/LogRow.tsx index 5148714..87b3f3a 100644 --- a/web/src/components/LogRow.tsx +++ b/web/src/components/LogRow.tsx @@ -1,3 +1,4 @@ +import { useState } from 'preact/hooks' import type { LogRecord } from '../types' import { sourceColor, LEVEL_COLOR } from '../types' @@ -6,7 +7,6 @@ interface Props { } function formatTs(iso: string): string { - // Show only HH:MM:SS.mmm from the ISO timestamp const d = new Date(iso) const hh = String(d.getHours()).padStart(2, '0') const mm = String(d.getMinutes()).padStart(2, '0') @@ -15,9 +15,21 @@ function formatTs(iso: string): string { return `${hh}:${mm}:${ss}.${ms}` } +function tryParseJson(s: string): object | null { + const t = s.trim() + if (t[0] !== '{' && t[0] !== '[') return null + try { + return JSON.parse(t) + } catch { + return null + } +} + export function LogRow({ record }: Props) { const sc = sourceColor(record.source) const lc = LEVEL_COLOR[record.level] + const parsed = tryParseJson(record.payload) + const [expanded, setExpanded] = useState(false) return (
@@ -28,7 +40,24 @@ export function LogRow({ record }: Props) { {record.level.slice(0, 5).toUpperCase().padEnd(5, ' ')} - {record.payload} + {parsed ? ( + + + {expanded ? ( +
{JSON.stringify(parsed, null, 2)}
+ ) : ( + {record.payload} + )} +
+ ) : ( + {record.payload} + )}
) } From cd6748d7e03b7c70bd84feb4e8f4187bd92227ed Mon Sep 17 00:00:00 2001 From: Dennis Paler Date: Sun, 5 Apr 2026 09:47:46 +0800 Subject: [PATCH 5/6] =?UTF-8?q?fix:=20ring=20buffer=20O(n)=E2=86=92VecDequ?= =?UTF-8?q?e,=20TUI=20layout=20guard,=20mutex=20poison=20recovery,=20cache?= =?UTF-8?q?=20filter=20regex,=20log=20task=20panics,=20SSE=20serialize=20e?= =?UTF-8?q?rror?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/tailflow-core/src/ingestion/process.rs | 8 ++++++-- crates/tailflow-daemon/src/routes.rs | 13 ++++++++----- crates/tailflow-daemon/src/state.rs | 15 +++++++++------ crates/tailflow-tui/src/app.rs | 6 ++++++ crates/tailflow-tui/src/ui/mod.rs | 13 ++++++------- 5 files changed, 35 insertions(+), 20 deletions(-) diff --git a/crates/tailflow-core/src/ingestion/process.rs b/crates/tailflow-core/src/ingestion/process.rs index 2b008c9..8033626 100644 --- a/crates/tailflow-core/src/ingestion/process.rs +++ b/crates/tailflow-core/src/ingestion/process.rs @@ -81,8 +81,12 @@ impl ProcessSource { }); let status = child.wait().await?; - stdout_task.await.ok(); - stderr_task.await.ok(); + if let Err(e) = stdout_task.await { + tracing::warn!(label = %self.label, err = ?e, "stdout reader task panicked"); + } + if let Err(e) = stderr_task.await { + tracing::warn!(label = %self.label, err = ?e, "stderr reader task panicked"); + } Ok(status) } diff --git a/crates/tailflow-daemon/src/routes.rs b/crates/tailflow-daemon/src/routes.rs index a77b27c..9a7ef8a 100644 --- a/crates/tailflow-daemon/src/routes.rs +++ b/crates/tailflow-daemon/src/routes.rs @@ -75,10 +75,13 @@ async fn sse_handler( let filter = params.into_filter(); let rx = state.tx.subscribe(); let stream = BroadcastStream::new(rx).filter_map(move |res| match res { - Ok(record) if filter.matches(&record) => { - let data = serde_json::to_string(&record).unwrap_or_default(); - Some(Ok(Event::default().data(data))) - } + Ok(record) if filter.matches(&record) => match serde_json::to_string(&record) { + Ok(data) => Some(Ok(Event::default().data(data))), + Err(e) => { + tracing::error!(err = %e, "failed to serialize log record for SSE"); + None + } + }, Ok(_) => None, Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => { tracing::warn!(dropped = n, "SSE client lagged"); @@ -99,7 +102,7 @@ async fn records_handler( let records: Vec<_> = state .ring .lock() - .unwrap() + .unwrap_or_else(|p| p.into_inner()) // recover from poisoned mutex .iter() .filter(|r| filter.matches(r)) .cloned() diff --git a/crates/tailflow-daemon/src/state.rs b/crates/tailflow-daemon/src/state.rs index b02c5b6..146c03e 100644 --- a/crates/tailflow-daemon/src/state.rs +++ b/crates/tailflow-daemon/src/state.rs @@ -1,4 +1,7 @@ -use std::sync::{Arc, Mutex}; +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; use tailflow_core::{LogReceiver, LogRecord, LogSender}; use tokio::sync::broadcast; @@ -9,7 +12,7 @@ pub struct AppState { /// Subscribe to the live stream by calling `tx.subscribe()`. pub tx: LogSender, /// Rolling buffer of the last RING_SIZE records (for `/api/records`). - pub ring: Mutex>, + pub ring: Mutex>, } impl AppState { @@ -18,7 +21,7 @@ impl AppState { let (tx, _) = broadcast::channel(tailflow_core::BUS_CAPACITY); let state = Arc::new(AppState { tx: tx.clone(), - ring: Mutex::new(Vec::with_capacity(RING_SIZE)), + ring: Mutex::new(VecDeque::with_capacity(RING_SIZE)), }); let state2 = state.clone(); @@ -27,11 +30,11 @@ impl AppState { match source_rx.recv().await { Ok(record) => { { - let mut buf = state2.ring.lock().unwrap(); + let mut buf = state2.ring.lock().unwrap_or_else(|p| p.into_inner()); if buf.len() >= RING_SIZE { - buf.remove(0); + buf.pop_front(); // O(1) vs Vec::remove(0) O(n) } - buf.push(record.clone()); + buf.push_back(record.clone()); } let _ = tx.send(record); } diff --git a/crates/tailflow-tui/src/app.rs b/crates/tailflow-tui/src/app.rs index 414a92b..e487da0 100644 --- a/crates/tailflow-tui/src/app.rs +++ b/crates/tailflow-tui/src/app.rs @@ -6,6 +6,7 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; use ratatui::{backend::CrosstermBackend, Terminal}; +use regex::Regex; use std::{io, time::Duration}; use tailflow_core::{LogReceiver, LogRecord}; use tokio::sync::broadcast; @@ -15,6 +16,8 @@ const MAX_RECORDS: usize = 2000; pub struct App { pub records: Vec, pub filter: String, + /// Compiled regex for `filter`; kept in sync whenever `filter` changes. + pub filter_re: Option, pub filter_mode: bool, rx: LogReceiver, pub scroll: usize, @@ -66,6 +69,7 @@ impl App { Self { records: Vec::with_capacity(MAX_RECORDS), filter: String::new(), + filter_re: None, filter_mode: false, rx, scroll: 0, @@ -159,9 +163,11 @@ impl App { } (true, KeyCode::Backspace) => { self.filter.pop(); + self.filter_re = Regex::new(&self.filter).ok(); } (true, KeyCode::Char(c)) => { self.filter.push(c); + self.filter_re = Regex::new(&self.filter).ok(); } _ => {} diff --git a/crates/tailflow-tui/src/ui/mod.rs b/crates/tailflow-tui/src/ui/mod.rs index dfe6915..21aee74 100644 --- a/crates/tailflow-tui/src/ui/mod.rs +++ b/crates/tailflow-tui/src/ui/mod.rs @@ -20,21 +20,20 @@ pub fn draw(f: &mut Frame, app: &mut App) { ]) .split(area); + // Guard against degenerate terminal sizes + if chunks.len() < 3 { + return; + } let list_height = chunks[1].height as usize; - // ── Build filter predicate ───────────────────────────────────────────── - let filter_re = if !app.filter.is_empty() { - regex::Regex::new(&app.filter).ok() - } else { - None - }; + // ── Build filter predicate (uses pre-compiled regex from App) ────────── let filter_lower = app.filter.to_lowercase(); let matches = |payload: &str, source: &str| -> bool { if app.filter.is_empty() { return true; } - if let Some(re) = &filter_re { + if let Some(re) = &app.filter_re { re.is_match(payload) || re.is_match(source) } else { payload.to_lowercase().contains(&filter_lower) From bcba2716a9ae8475ebdf4352f19a3060412c3218 Mon Sep 17 00:00:00 2001 From: Dennis Paler Date: Sun, 5 Apr 2026 09:50:49 +0800 Subject: [PATCH 6/6] =?UTF-8?q?docs:=20clean=20up=20roadmap=20=E2=80=94=20?= =?UTF-8?q?remove=20completed=20items,=20add=20high-impact=20next=20steps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index b32aa5f..9b40926 100644 --- a/README.md +++ b/README.md @@ -423,6 +423,32 @@ tailflow/ --- +## Roadmap + +### Near-term + +- [ ] **Web dashboard search bar** — live `?grep=` filter input in the UI so users don't need to hand-craft query params +- [ ] **Log export** — download filtered records as `.ndjson` or `.txt` from the web dashboard +- [ ] **Graceful shutdown** — SIGTERM drains in-flight records and flushes the ring buffer before exit +- [ ] **`--follow` flag for files** — tail from the end by default; `--no-follow` reads the whole file and exits (like `tail -f` vs `cat`) +- [ ] **Docker Compose integration** — auto-discover services from a `docker-compose.yml` in the project root without listing them manually + +### High-impact + +- [ ] **Log level filter toggles in TUI** — press `e`/`w`/`i`/`d` to show/hide Error, Warn, Info, Debug levels; currently only regex filter exists +- [ ] **Persistent log buffer to disk** — optional SQLite ring buffer so logs survive daemon restarts and can be queried historically +- [ ] **`[[sources.http]]` webhook receiver** — accept POST payloads from external services (Vercel, Render, Fly.io log drains) and ingest them as a named source +- [ ] **Web dashboard dark/light theme toggle** — currently hardcoded dark; one `prefers-color-scheme` CSS variable swap would cover both +- [ ] **OpenTelemetry / OTLP exporter** — forward collected logs to a collector (Grafana Cloud, Honeycomb, Datadog) for teams who want cloud retention without changing their local workflow + +### Speculative / community interest + +- [ ] **TUI split-pane view** — side-by-side panes showing two sources simultaneously; useful when debugging a frontend + backend at the same time +- [ ] **Plugin system for custom sources** — WASM or subprocess-based source plugins so users can add sources (Kafka, Redis pub/sub, AWS CloudWatch) without forking +- [ ] **AI log summarisation** — `s` key in TUI calls a local LLM (Ollama) or cloud API to summarise the last N error records into a plain-English diagnosis + +--- + ## Contributing Contributions are welcome. Please open an issue before submitting a large PR so we can align on the approach.