diff --git a/client/src/hooks/useJobEvents.js b/client/src/hooks/useJobEvents.js
new file mode 100644
index 0000000..7aa152e
--- /dev/null
+++ b/client/src/hooks/useJobEvents.js
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2026 by Christian Kellner.
+ * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
+ */
+
+import { useEffect, useRef, useState } from 'react'
+
+const BASE = import.meta.env.VITE_API_BASE ?? '/api'
+
+function getToken() {
+ return new URLSearchParams(window.location.search).get('token') ?? ''
+}
+
+export function useJobEvents({ onRunStarted, onRunFinished }) {
+ const [connected, setConnected] = useState(false)
+ const onStartedRef = useRef(onRunStarted)
+ const onFinishedRef = useRef(onRunFinished)
+
+ // Sync latest callbacks each render so the EventSource effect below never goes stale
+ useEffect(() => {
+ onStartedRef.current = onRunStarted
+ onFinishedRef.current = onRunFinished
+ })
+
+ useEffect(() => {
+ const token = getToken()
+ const url = `${BASE}/events${token ? `?token=${encodeURIComponent(token)}` : ''}`
+ const es = new EventSource(url)
+
+ es.onopen = () => setConnected(true)
+ // EventSource auto-retries on transient errors; connected returns to true on onopen
+ es.onerror = () => setConnected(false)
+ es.addEventListener('run:started', (e) => {
+ try { onStartedRef.current(JSON.parse(e.data)) } catch { /* malformed payload */ }
+ })
+ es.addEventListener('run:finished', (e) => {
+ try { onFinishedRef.current(JSON.parse(e.data)) } catch { /* malformed payload */ }
+ })
+
+ return () => {
+ es.close()
+ setConnected(false)
+ }
+ }, [])
+
+ return { connected }
+}
diff --git a/client/src/hooks/useJobs.js b/client/src/hooks/useJobs.js
index a9df4c5..2594333 100644
--- a/client/src/hooks/useJobs.js
+++ b/client/src/hooks/useJobs.js
@@ -56,5 +56,18 @@ export function useJobs() {
return res.data
}, [])
- return { jobs, isLoading, error, createJob, updateJob, deleteJob, toggleJob, triggerRun, refresh: fetchJobs }
+ const updateRunStarted = useCallback(({ jobId }) => {
+ setJobs(prev => prev.map(j => j.id === jobId ? { ...j, last_run_status: 'running' } : j))
+ }, [])
+
+ const updateRunFinished = useCallback(({ jobId, status }) => {
+ setJobs(prev => prev.map(j => j.id === jobId ? { ...j, last_run_status: status } : j))
+ }, [])
+
+ return {
+ jobs, isLoading, error,
+ createJob, updateJob, deleteJob, toggleJob, triggerRun,
+ refresh: fetchJobs,
+ updateRunStarted, updateRunFinished,
+ }
}
diff --git a/client/src/hooks/useRunHistory.js b/client/src/hooks/useRunHistory.js
index 9fc274d..1f8dc9f 100644
--- a/client/src/hooks/useRunHistory.js
+++ b/client/src/hooks/useRunHistory.js
@@ -45,5 +45,15 @@ export function useRunHistory(jobId) {
fetchRuns(offset + limit)
}, [fetchRuns, offset, limit])
- return { runs, total, isLoading, error, loadMore, hasMore: runs.length < total, refresh: () => fetchRuns(0) }
+ // Intentionally resets to page 1 so the new run appears at the top
+ const handleRunFinished = useCallback((data) => {
+ if (data.jobId === jobId) fetchRuns(0)
+ }, [jobId, fetchRuns])
+
+ return {
+ runs, total, isLoading, error,
+ loadMore, hasMore: runs.length < total,
+ refresh: () => fetchRuns(0),
+ handleRunFinished,
+ }
}
diff --git a/server/src/app.js b/server/src/app.js
index dc81916..1d94daf 100644
--- a/server/src/app.js
+++ b/server/src/app.js
@@ -10,6 +10,7 @@ import { createJobsRouter } from './routes/jobs.js'
import { createRunsRouter } from './routes/runs.js'
import { errorHandler } from './middleware/errorHandler.js'
import { gatewayTokenMiddleware } from './middleware/gatewayToken.js'
+import { eventBus } from './services/eventBus.js'
const pkg = JSON.parse(fs.readFileSync(new URL('../../package.json', import.meta.url), 'utf8'))
@@ -32,6 +33,40 @@ export function createApp(scheduler) {
app.use('/api/jobs', createJobsRouter(scheduler))
app.use('/api/runs', createRunsRouter())
+ app.get('/api/events', (req, res) => {
+ res.setHeader('Content-Type', 'text/event-stream')
+ res.setHeader('Cache-Control', 'no-cache')
+ res.setHeader('Connection', 'keep-alive')
+ res.setHeader('X-Accel-Buffering', 'no')
+ res.flushHeaders()
+
+ function send(eventName, data) {
+ if (!res.writableEnded) {
+ res.write(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
+ }
+ }
+
+ const onStarted = (data) => send('run:started', data)
+ const onFinished = (data) => send('run:finished', data)
+
+ eventBus.on('run:started', onStarted)
+ eventBus.on('run:finished', onFinished)
+
+ let cleaned = false
+ function cleanup() {
+ if (cleaned) return
+ cleaned = true
+ eventBus.off('run:started', onStarted)
+ eventBus.off('run:finished', onFinished)
+ }
+
+ req.on('close', cleanup)
+ if (res.socket) {
+ res.socket.on('end', cleanup)
+ res.socket.on('close', cleanup)
+ }
+ })
+
app.get('/api/validate-path', (req, res) => {
const filePath = req.query.path || ''
if (!filePath.trim()) return res.json({ exists: false, isFile: false, executable: false })
diff --git a/server/src/middleware/gatewayToken.js b/server/src/middleware/gatewayToken.js
index 908b68e..4eca091 100644
--- a/server/src/middleware/gatewayToken.js
+++ b/server/src/middleware/gatewayToken.js
@@ -9,7 +9,8 @@ export function gatewayTokenMiddleware(req, res, next) {
const token = process.env.GATEWAY_TOKEN
if (!token) return next()
- const provided = req.headers['x-gateway-token']
+ // EventSource cannot set custom headers, so SSE clients send the token as ?token=...
+ const provided = req.headers['x-gateway-token'] || req.query.token || ''
if (!provided) {
return res.status(401).json({ error: 'Unauthorized: invalid or missing gateway token' })
}
diff --git a/server/src/scheduler/executor.js b/server/src/scheduler/executor.js
index c091ec6..6427f8a 100644
--- a/server/src/scheduler/executor.js
+++ b/server/src/scheduler/executor.js
@@ -7,6 +7,7 @@ import { spawn } from 'child_process'
import { getDb } from '../db/index.js'
import { send as ntfySend } from '../services/ntfy.js'
import { logger } from '../logger.js'
+import { eventBus } from '../services/eventBus.js'
const MAX_OUTPUT_BYTES = 512 * 1024
@@ -21,6 +22,7 @@ export function run(job, triggeredBy = 'scheduler') {
const runId = runResult.lastInsertRowid
logger.info({ jobId: job.id, runId, triggeredBy }, `Job "${job.name}" started`)
+ eventBus.emit('run:started', { jobId: job.id, runId, triggeredBy })
const child = job.command_type === 'inline'
? spawn('/bin/sh', ['-c', job.command], { stdio: ['ignore', 'pipe', 'pipe'] })
@@ -67,13 +69,12 @@ export function run(job, triggeredBy = 'scheduler') {
WHERE id = ?
`).run(status, exitCode, stdout, stderr, duration, runId)
- const maxRuns = Number(process.env.KEEP_MAX_FOR_HISTORY) || 5
db.prepare(`
DELETE FROM runs
WHERE job_id = ? AND id NOT IN (
SELECT id FROM runs WHERE job_id = ? ORDER BY id DESC LIMIT ?
)
- `).run(job.id, job.id, maxRuns)
+ `).run(job.id, job.id, Number(process.env.KEEP_MAX_FOR_HISTORY) || 5)
if (status === 'error' && job.ntfy_enabled && job.ntfy_on_error) {
ntfySend(job, { status, exitCode, stderr }).catch(() => {})
@@ -81,6 +82,7 @@ export function run(job, triggeredBy = 'scheduler') {
ntfySend(job, { status, exitCode }).catch(() => {})
}
+ eventBus.emit('run:finished', { jobId: job.id, runId, status, exitCode, duration_ms: duration })
logger.info({ jobId: job.id, runId, status, exitCode, duration }, `Job "${job.name}" finished`)
})
diff --git a/server/src/services/eventBus.js b/server/src/services/eventBus.js
new file mode 100644
index 0000000..0409875
--- /dev/null
+++ b/server/src/services/eventBus.js
@@ -0,0 +1,10 @@
+/*
+ * Copyright (c) 2026 by Christian Kellner.
+ * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
+ */
+
+import { EventEmitter } from 'events'
+
+export const eventBus = new EventEmitter()
+eventBus.setMaxListeners(0)
+
diff --git a/server/tests/events.test.js b/server/tests/events.test.js
new file mode 100644
index 0000000..285e6a7
--- /dev/null
+++ b/server/tests/events.test.js
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2026 by Christian Kellner.
+ * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
+ */
+
+import { describe, it, expect, afterEach, beforeEach } from 'vitest'
+import http from 'http'
+import { makeApp } from './setup.js'
+import { eventBus } from '../src/services/eventBus.js'
+
+let server
+let port
+
+beforeEach(() => {
+ server = makeApp().listen(0)
+ port = server.address().port
+})
+
+afterEach(async () => {
+ await new Promise(r => server.close(r))
+})
+
+function connectSSE() {
+ return new Promise((resolve, reject) => {
+ const req = http.get(`http://localhost:${port}/api/events`, (res) => {
+ resolve({ req, res })
+ })
+ req.on('error', (e) => { if (e.code !== 'ECONNRESET') reject(e) })
+ setTimeout(() => reject(new Error('SSE connect timeout')), 2000)
+ })
+}
+
+function collectData(res, count = 1, timeoutMs = 2000) {
+ return new Promise((resolve, reject) => {
+ const chunks = []
+ const timer = setTimeout(() => reject(new Error('collectData timeout')), timeoutMs)
+ res.on('data', (chunk) => {
+ chunks.push(chunk.toString())
+ if (chunks.length >= count) { clearTimeout(timer); resolve(chunks.join('')) }
+ })
+ })
+}
+
+describe('GET /api/events', () => {
+ it('responds with SSE headers', async () => {
+ const { req, res } = await connectSSE()
+ expect(res.headers['content-type']).toBe('text/event-stream')
+ expect(res.headers['cache-control']).toBe('no-cache')
+ req.destroy()
+ })
+
+ it('sends run:started event when bus emits', async () => {
+ const { req, res } = await connectSSE()
+ const dataPromise = collectData(res)
+ await new Promise(r => setTimeout(r, 30))
+ eventBus.emit('run:started', { jobId: 1, runId: 42, triggeredBy: 'manual' })
+ const raw = await dataPromise
+ expect(raw).toContain('event: run:started')
+ expect(raw).toContain('"jobId":1')
+ expect(raw).toContain('"runId":42')
+ req.destroy()
+ })
+
+ it('sends run:finished event when bus emits', async () => {
+ const { req, res } = await connectSSE()
+ const dataPromise = collectData(res)
+ await new Promise(r => setTimeout(r, 30))
+ eventBus.emit('run:finished', { jobId: 2, runId: 7, status: 'success', exitCode: 0, duration_ms: 500 })
+ const raw = await dataPromise
+ expect(raw).toContain('event: run:finished')
+ expect(raw).toContain('"status":"success"')
+ req.destroy()
+ })
+
+ it('removes bus listeners when client disconnects', async () => {
+ const listenersBefore = eventBus.listenerCount('run:started')
+ const { req, res } = await connectSSE()
+ expect(eventBus.listenerCount('run:started')).toBe(listenersBefore + 1)
+ req.destroy()
+ res.resume()
+ await new Promise(r => setTimeout(r, 50))
+ expect(eventBus.listenerCount('run:started')).toBe(listenersBefore)
+ })
+})
diff --git a/server/tests/gatewayToken.test.js b/server/tests/gatewayToken.test.js
new file mode 100644
index 0000000..0316ada
--- /dev/null
+++ b/server/tests/gatewayToken.test.js
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2026 by Christian Kellner.
+ * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
+ */
+
+import { describe, it, expect, beforeEach, afterEach } from 'vitest'
+import request from 'supertest'
+import { makeDb, makeApp } from './setup.js'
+
+let app
+
+beforeEach(() => {
+ makeDb()
+ app = makeApp()
+ process.env.GATEWAY_TOKEN = 'secret123'
+})
+
+afterEach(() => {
+ delete process.env.GATEWAY_TOKEN
+})
+
+describe('gatewayTokenMiddleware with query param', () => {
+ it('accepts valid token from x-gateway-token header', async () => {
+ const res = await request(app)
+ .get('/api/jobs')
+ .set('X-Gateway-Token', 'secret123')
+ expect(res.status).toBe(200)
+ })
+
+ it('accepts valid token from ?token query param', async () => {
+ const res = await request(app)
+ .get('/api/jobs?token=secret123')
+ expect(res.status).toBe(200)
+ })
+
+ it('rejects wrong token from query param', async () => {
+ const res = await request(app)
+ .get('/api/jobs?token=wrong')
+ expect(res.status).toBe(401)
+ })
+
+ it('rejects request with no token at all', async () => {
+ const res = await request(app).get('/api/jobs')
+ expect(res.status).toBe(401)
+ })
+})