diff --git a/client/src/App.jsx b/client/src/App.jsx index e868cec..3f35059 100644 --- a/client/src/App.jsx +++ b/client/src/App.jsx @@ -11,6 +11,8 @@ import { DeleteConfirm } from './components/jobs/DeleteConfirm.jsx' import { RunHistoryPanel } from './components/runs/RunHistoryPanel.jsx' import { UnauthorizedPage } from './components/UnauthorizedPage.jsx' import { useJobs } from './hooks/useJobs.js' +import { useRunHistory } from './hooks/useRunHistory.js' +import { useJobEvents } from './hooks/useJobEvents.js' import { useToast } from './components/ui/Toast.jsx' export default function App() { @@ -27,17 +29,38 @@ export default function App() { .catch(() => setAuthState('ok')) }, []) - const { jobs, isLoading, error, createJob, updateJob, deleteJob, toggleJob, triggerRun } = useJobs() + const { + jobs, isLoading, error, + createJob, updateJob, deleteJob, toggleJob, triggerRun, + updateRunStarted, updateRunFinished, + } = useJobs() const { addToast } = useToast() const [dialogState, setDialogState] = useState(null) const [deleteTarget, setDeleteTarget] = useState(null) const [historyJob, setHistoryJob] = useState(null) + const runHistory = useRunHistory(historyJob?.id) + + useJobEvents({ + onRunStarted: updateRunStarted, + onRunFinished: (data) => { + updateRunFinished(data) + runHistory.handleRunFinished(data) + }, + }) + const handleNew = () => setDialogState({ mode: 'create' }) const handleEdit = (job) => setDialogState({ mode: 'edit', job }) const handleDelete = (job) => setDeleteTarget(job) - const handleHistory = (job) => setHistoryJob(job) + const handleHistory = (job) => { + setHistoryJob(prev => { + // When the same job is re-selected the jobId doesn't change, so useRunHistory's + // useEffect won't re-fire. Refresh explicitly for that case only. + if (prev?.id === job.id) runHistory.refresh() + return job + }) + } const handleSave = async (formData) => { if (dialogState.mode === 'create') { @@ -72,7 +95,17 @@ export default function App() { if (authState === 'unauthorized') return const sidebar = historyJob ? ( - setHistoryJob(null)} /> + setHistoryJob(null)} + /> ) : null return ( diff --git a/client/src/components/runs/RunHistoryPanel.jsx b/client/src/components/runs/RunHistoryPanel.jsx index b5e97a0..b5ba26b 100644 --- a/client/src/components/runs/RunHistoryPanel.jsx +++ b/client/src/components/runs/RunHistoryPanel.jsx @@ -4,16 +4,13 @@ */ import { X, RefreshCw, History } from 'lucide-react' -import { useRunHistory } from '../../hooks/useRunHistory.js' import { RunRecord } from './RunRecord.jsx' import { LoadingSpinner } from '../ui/LoadingSpinner.jsx' import { EmptyState } from '../ui/EmptyState.jsx' import { Button } from '../ui/Button.jsx' import { Tooltip } from '../ui/Tooltip.jsx' -export function RunHistoryPanel({ job, onClose }) { - const { runs, total, isLoading, error, loadMore, hasMore, refresh } = useRunHistory(job?.id) - +export function RunHistoryPanel({ job, runs, total, isLoading, error, loadMore, hasMore, refresh, onClose }) { return (
diff --git a/client/src/hooks/useJobEvents.js b/client/src/hooks/useJobEvents.js new file mode 100644 index 0000000..7aa152e --- /dev/null +++ b/client/src/hooks/useJobEvents.js @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2026 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +import { useEffect, useRef, useState } from 'react' + +const BASE = import.meta.env.VITE_API_BASE ?? '/api' + +function getToken() { + return new URLSearchParams(window.location.search).get('token') ?? '' +} + +export function useJobEvents({ onRunStarted, onRunFinished }) { + const [connected, setConnected] = useState(false) + const onStartedRef = useRef(onRunStarted) + const onFinishedRef = useRef(onRunFinished) + + // Sync latest callbacks each render so the EventSource effect below never goes stale + useEffect(() => { + onStartedRef.current = onRunStarted + onFinishedRef.current = onRunFinished + }) + + useEffect(() => { + const token = getToken() + const url = `${BASE}/events${token ? `?token=${encodeURIComponent(token)}` : ''}` + const es = new EventSource(url) + + es.onopen = () => setConnected(true) + // EventSource auto-retries on transient errors; connected returns to true on onopen + es.onerror = () => setConnected(false) + es.addEventListener('run:started', (e) => { + try { onStartedRef.current(JSON.parse(e.data)) } catch { /* malformed payload */ } + }) + es.addEventListener('run:finished', (e) => { + try { onFinishedRef.current(JSON.parse(e.data)) } catch { /* malformed payload */ } + }) + + return () => { + es.close() + setConnected(false) + } + }, []) + + return { connected } +} diff --git a/client/src/hooks/useJobs.js b/client/src/hooks/useJobs.js index a9df4c5..2594333 100644 --- a/client/src/hooks/useJobs.js +++ b/client/src/hooks/useJobs.js @@ -56,5 +56,18 @@ export function useJobs() { return res.data }, []) - return { jobs, isLoading, error, createJob, updateJob, deleteJob, toggleJob, triggerRun, refresh: fetchJobs } + const updateRunStarted = useCallback(({ jobId }) => { + setJobs(prev => prev.map(j => j.id === jobId ? { ...j, last_run_status: 'running' } : j)) + }, []) + + const updateRunFinished = useCallback(({ jobId, status }) => { + setJobs(prev => prev.map(j => j.id === jobId ? { ...j, last_run_status: status } : j)) + }, []) + + return { + jobs, isLoading, error, + createJob, updateJob, deleteJob, toggleJob, triggerRun, + refresh: fetchJobs, + updateRunStarted, updateRunFinished, + } } diff --git a/client/src/hooks/useRunHistory.js b/client/src/hooks/useRunHistory.js index 9fc274d..1f8dc9f 100644 --- a/client/src/hooks/useRunHistory.js +++ b/client/src/hooks/useRunHistory.js @@ -45,5 +45,15 @@ export function useRunHistory(jobId) { fetchRuns(offset + limit) }, [fetchRuns, offset, limit]) - return { runs, total, isLoading, error, loadMore, hasMore: runs.length < total, refresh: () => fetchRuns(0) } + // Intentionally resets to page 1 so the new run appears at the top + const handleRunFinished = useCallback((data) => { + if (data.jobId === jobId) fetchRuns(0) + }, [jobId, fetchRuns]) + + return { + runs, total, isLoading, error, + loadMore, hasMore: runs.length < total, + refresh: () => fetchRuns(0), + handleRunFinished, + } } diff --git a/server/src/app.js b/server/src/app.js index dc81916..1d94daf 100644 --- a/server/src/app.js +++ b/server/src/app.js @@ -10,6 +10,7 @@ import { createJobsRouter } from './routes/jobs.js' import { createRunsRouter } from './routes/runs.js' import { errorHandler } from './middleware/errorHandler.js' import { gatewayTokenMiddleware } from './middleware/gatewayToken.js' +import { eventBus } from './services/eventBus.js' const pkg = JSON.parse(fs.readFileSync(new URL('../../package.json', import.meta.url), 'utf8')) @@ -32,6 +33,40 @@ export function createApp(scheduler) { app.use('/api/jobs', createJobsRouter(scheduler)) app.use('/api/runs', createRunsRouter()) + app.get('/api/events', (req, res) => { + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') + res.flushHeaders() + + function send(eventName, data) { + if (!res.writableEnded) { + res.write(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`) + } + } + + const onStarted = (data) => send('run:started', data) + const onFinished = (data) => send('run:finished', data) + + eventBus.on('run:started', onStarted) + eventBus.on('run:finished', onFinished) + + let cleaned = false + function cleanup() { + if (cleaned) return + cleaned = true + eventBus.off('run:started', onStarted) + eventBus.off('run:finished', onFinished) + } + + req.on('close', cleanup) + if (res.socket) { + res.socket.on('end', cleanup) + res.socket.on('close', cleanup) + } + }) + app.get('/api/validate-path', (req, res) => { const filePath = req.query.path || '' if (!filePath.trim()) return res.json({ exists: false, isFile: false, executable: false }) diff --git a/server/src/middleware/gatewayToken.js b/server/src/middleware/gatewayToken.js index 908b68e..4eca091 100644 --- a/server/src/middleware/gatewayToken.js +++ b/server/src/middleware/gatewayToken.js @@ -9,7 +9,8 @@ export function gatewayTokenMiddleware(req, res, next) { const token = process.env.GATEWAY_TOKEN if (!token) return next() - const provided = req.headers['x-gateway-token'] + // EventSource cannot set custom headers, so SSE clients send the token as ?token=... + const provided = req.headers['x-gateway-token'] || req.query.token || '' if (!provided) { return res.status(401).json({ error: 'Unauthorized: invalid or missing gateway token' }) } diff --git a/server/src/scheduler/executor.js b/server/src/scheduler/executor.js index c091ec6..6427f8a 100644 --- a/server/src/scheduler/executor.js +++ b/server/src/scheduler/executor.js @@ -7,6 +7,7 @@ import { spawn } from 'child_process' import { getDb } from '../db/index.js' import { send as ntfySend } from '../services/ntfy.js' import { logger } from '../logger.js' +import { eventBus } from '../services/eventBus.js' const MAX_OUTPUT_BYTES = 512 * 1024 @@ -21,6 +22,7 @@ export function run(job, triggeredBy = 'scheduler') { const runId = runResult.lastInsertRowid logger.info({ jobId: job.id, runId, triggeredBy }, `Job "${job.name}" started`) + eventBus.emit('run:started', { jobId: job.id, runId, triggeredBy }) const child = job.command_type === 'inline' ? spawn('/bin/sh', ['-c', job.command], { stdio: ['ignore', 'pipe', 'pipe'] }) @@ -67,13 +69,12 @@ export function run(job, triggeredBy = 'scheduler') { WHERE id = ? `).run(status, exitCode, stdout, stderr, duration, runId) - const maxRuns = Number(process.env.KEEP_MAX_FOR_HISTORY) || 5 db.prepare(` DELETE FROM runs WHERE job_id = ? AND id NOT IN ( SELECT id FROM runs WHERE job_id = ? ORDER BY id DESC LIMIT ? ) - `).run(job.id, job.id, maxRuns) + `).run(job.id, job.id, Number(process.env.KEEP_MAX_FOR_HISTORY) || 5) if (status === 'error' && job.ntfy_enabled && job.ntfy_on_error) { ntfySend(job, { status, exitCode, stderr }).catch(() => {}) @@ -81,6 +82,7 @@ export function run(job, triggeredBy = 'scheduler') { ntfySend(job, { status, exitCode }).catch(() => {}) } + eventBus.emit('run:finished', { jobId: job.id, runId, status, exitCode, duration_ms: duration }) logger.info({ jobId: job.id, runId, status, exitCode, duration }, `Job "${job.name}" finished`) }) diff --git a/server/src/services/eventBus.js b/server/src/services/eventBus.js new file mode 100644 index 0000000..0409875 --- /dev/null +++ b/server/src/services/eventBus.js @@ -0,0 +1,10 @@ +/* + * Copyright (c) 2026 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +import { EventEmitter } from 'events' + +export const eventBus = new EventEmitter() +eventBus.setMaxListeners(0) + diff --git a/server/tests/events.test.js b/server/tests/events.test.js new file mode 100644 index 0000000..285e6a7 --- /dev/null +++ b/server/tests/events.test.js @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2026 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +import { describe, it, expect, afterEach, beforeEach } from 'vitest' +import http from 'http' +import { makeApp } from './setup.js' +import { eventBus } from '../src/services/eventBus.js' + +let server +let port + +beforeEach(() => { + server = makeApp().listen(0) + port = server.address().port +}) + +afterEach(async () => { + await new Promise(r => server.close(r)) +}) + +function connectSSE() { + return new Promise((resolve, reject) => { + const req = http.get(`http://localhost:${port}/api/events`, (res) => { + resolve({ req, res }) + }) + req.on('error', (e) => { if (e.code !== 'ECONNRESET') reject(e) }) + setTimeout(() => reject(new Error('SSE connect timeout')), 2000) + }) +} + +function collectData(res, count = 1, timeoutMs = 2000) { + return new Promise((resolve, reject) => { + const chunks = [] + const timer = setTimeout(() => reject(new Error('collectData timeout')), timeoutMs) + res.on('data', (chunk) => { + chunks.push(chunk.toString()) + if (chunks.length >= count) { clearTimeout(timer); resolve(chunks.join('')) } + }) + }) +} + +describe('GET /api/events', () => { + it('responds with SSE headers', async () => { + const { req, res } = await connectSSE() + expect(res.headers['content-type']).toBe('text/event-stream') + expect(res.headers['cache-control']).toBe('no-cache') + req.destroy() + }) + + it('sends run:started event when bus emits', async () => { + const { req, res } = await connectSSE() + const dataPromise = collectData(res) + await new Promise(r => setTimeout(r, 30)) + eventBus.emit('run:started', { jobId: 1, runId: 42, triggeredBy: 'manual' }) + const raw = await dataPromise + expect(raw).toContain('event: run:started') + expect(raw).toContain('"jobId":1') + expect(raw).toContain('"runId":42') + req.destroy() + }) + + it('sends run:finished event when bus emits', async () => { + const { req, res } = await connectSSE() + const dataPromise = collectData(res) + await new Promise(r => setTimeout(r, 30)) + eventBus.emit('run:finished', { jobId: 2, runId: 7, status: 'success', exitCode: 0, duration_ms: 500 }) + const raw = await dataPromise + expect(raw).toContain('event: run:finished') + expect(raw).toContain('"status":"success"') + req.destroy() + }) + + it('removes bus listeners when client disconnects', async () => { + const listenersBefore = eventBus.listenerCount('run:started') + const { req, res } = await connectSSE() + expect(eventBus.listenerCount('run:started')).toBe(listenersBefore + 1) + req.destroy() + res.resume() + await new Promise(r => setTimeout(r, 50)) + expect(eventBus.listenerCount('run:started')).toBe(listenersBefore) + }) +}) diff --git a/server/tests/gatewayToken.test.js b/server/tests/gatewayToken.test.js new file mode 100644 index 0000000..0316ada --- /dev/null +++ b/server/tests/gatewayToken.test.js @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2026 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import request from 'supertest' +import { makeDb, makeApp } from './setup.js' + +let app + +beforeEach(() => { + makeDb() + app = makeApp() + process.env.GATEWAY_TOKEN = 'secret123' +}) + +afterEach(() => { + delete process.env.GATEWAY_TOKEN +}) + +describe('gatewayTokenMiddleware with query param', () => { + it('accepts valid token from x-gateway-token header', async () => { + const res = await request(app) + .get('/api/jobs') + .set('X-Gateway-Token', 'secret123') + expect(res.status).toBe(200) + }) + + it('accepts valid token from ?token query param', async () => { + const res = await request(app) + .get('/api/jobs?token=secret123') + expect(res.status).toBe(200) + }) + + it('rejects wrong token from query param', async () => { + const res = await request(app) + .get('/api/jobs?token=wrong') + expect(res.status).toBe(401) + }) + + it('rejects request with no token at all', async () => { + const res = await request(app).get('/api/jobs') + expect(res.status).toBe(401) + }) +})