Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions client/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { DeleteConfirm } from './components/jobs/DeleteConfirm.jsx'
import { RunHistoryPanel } from './components/runs/RunHistoryPanel.jsx'
import { UnauthorizedPage } from './components/UnauthorizedPage.jsx'
import { useJobs } from './hooks/useJobs.js'
import { useRunHistory } from './hooks/useRunHistory.js'
import { useJobEvents } from './hooks/useJobEvents.js'
import { useToast } from './components/ui/Toast.jsx'

export default function App() {
Expand All @@ -27,17 +29,38 @@ export default function App() {
.catch(() => setAuthState('ok'))
}, [])

const { jobs, isLoading, error, createJob, updateJob, deleteJob, toggleJob, triggerRun } = useJobs()
const {
jobs, isLoading, error,
createJob, updateJob, deleteJob, toggleJob, triggerRun,
updateRunStarted, updateRunFinished,
} = useJobs()
const { addToast } = useToast()

const [dialogState, setDialogState] = useState(null)
const [deleteTarget, setDeleteTarget] = useState(null)
const [historyJob, setHistoryJob] = useState(null)

const runHistory = useRunHistory(historyJob?.id)

useJobEvents({
onRunStarted: updateRunStarted,
onRunFinished: (data) => {
updateRunFinished(data)
runHistory.handleRunFinished(data)
},
})

const handleNew = () => setDialogState({ mode: 'create' })
const handleEdit = (job) => setDialogState({ mode: 'edit', job })
const handleDelete = (job) => setDeleteTarget(job)
const handleHistory = (job) => setHistoryJob(job)
const handleHistory = (job) => {
setHistoryJob(prev => {
// When the same job is re-selected the jobId doesn't change, so useRunHistory's
// useEffect won't re-fire. Refresh explicitly for that case only.
if (prev?.id === job.id) runHistory.refresh()
return job
})
}

const handleSave = async (formData) => {
if (dialogState.mode === 'create') {
Expand Down Expand Up @@ -72,7 +95,17 @@ export default function App() {
if (authState === 'unauthorized') return <UnauthorizedPage />

const sidebar = historyJob ? (
<RunHistoryPanel job={historyJob} onClose={() => setHistoryJob(null)} />
<RunHistoryPanel
job={historyJob}
runs={runHistory.runs}
total={runHistory.total}
isLoading={runHistory.isLoading}
error={runHistory.error}
loadMore={runHistory.loadMore}
hasMore={runHistory.hasMore}
refresh={runHistory.refresh}
onClose={() => setHistoryJob(null)}
/>
) : null

return (
Expand Down
5 changes: 1 addition & 4 deletions client/src/components/runs/RunHistoryPanel.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
*/

import { X, RefreshCw, History } from 'lucide-react'
import { useRunHistory } from '../../hooks/useRunHistory.js'
import { RunRecord } from './RunRecord.jsx'
import { LoadingSpinner } from '../ui/LoadingSpinner.jsx'
import { EmptyState } from '../ui/EmptyState.jsx'
import { Button } from '../ui/Button.jsx'
import { Tooltip } from '../ui/Tooltip.jsx'

export function RunHistoryPanel({ job, onClose }) {
const { runs, total, isLoading, error, loadMore, hasMore, refresh } = useRunHistory(job?.id)

export function RunHistoryPanel({ job, runs, total, isLoading, error, loadMore, hasMore, refresh, onClose }) {
return (
<div className="flex flex-col h-full bg-[#161616]">
<div className="flex items-center justify-between px-4 py-3 border-b border-[#2a2a2a] shrink-0 bg-[#0d0d0d]">
Expand Down
47 changes: 47 additions & 0 deletions client/src/hooks/useJobEvents.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2026 by Christian Kellner.
* Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
*/

import { useEffect, useRef, useState } from 'react'

const BASE = import.meta.env.VITE_API_BASE ?? '/api'

function getToken() {
return new URLSearchParams(window.location.search).get('token') ?? ''
}

export function useJobEvents({ onRunStarted, onRunFinished }) {
const [connected, setConnected] = useState(false)
const onStartedRef = useRef(onRunStarted)
const onFinishedRef = useRef(onRunFinished)

// Sync latest callbacks each render so the EventSource effect below never goes stale
useEffect(() => {
onStartedRef.current = onRunStarted
onFinishedRef.current = onRunFinished
})

useEffect(() => {
const token = getToken()
const url = `${BASE}/events${token ? `?token=${encodeURIComponent(token)}` : ''}`
const es = new EventSource(url)

es.onopen = () => setConnected(true)
// EventSource auto-retries on transient errors; connected returns to true on onopen
es.onerror = () => setConnected(false)
es.addEventListener('run:started', (e) => {
try { onStartedRef.current(JSON.parse(e.data)) } catch { /* malformed payload */ }
})
es.addEventListener('run:finished', (e) => {
try { onFinishedRef.current(JSON.parse(e.data)) } catch { /* malformed payload */ }
})

return () => {
es.close()
setConnected(false)
}
}, [])

return { connected }
}
15 changes: 14 additions & 1 deletion client/src/hooks/useJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,18 @@ export function useJobs() {
return res.data
}, [])

return { jobs, isLoading, error, createJob, updateJob, deleteJob, toggleJob, triggerRun, refresh: fetchJobs }
const updateRunStarted = useCallback(({ jobId }) => {
setJobs(prev => prev.map(j => j.id === jobId ? { ...j, last_run_status: 'running' } : j))
}, [])

const updateRunFinished = useCallback(({ jobId, status }) => {
setJobs(prev => prev.map(j => j.id === jobId ? { ...j, last_run_status: status } : j))
}, [])

return {
jobs, isLoading, error,
createJob, updateJob, deleteJob, toggleJob, triggerRun,
refresh: fetchJobs,
updateRunStarted, updateRunFinished,
}
}
12 changes: 11 additions & 1 deletion client/src/hooks/useRunHistory.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,15 @@ export function useRunHistory(jobId) {
fetchRuns(offset + limit)
}, [fetchRuns, offset, limit])

return { runs, total, isLoading, error, loadMore, hasMore: runs.length < total, refresh: () => fetchRuns(0) }
// Intentionally resets to page 1 so the new run appears at the top
const handleRunFinished = useCallback((data) => {
if (data.jobId === jobId) fetchRuns(0)
}, [jobId, fetchRuns])

return {
runs, total, isLoading, error,
loadMore, hasMore: runs.length < total,
refresh: () => fetchRuns(0),
handleRunFinished,
}
}
35 changes: 35 additions & 0 deletions server/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { createJobsRouter } from './routes/jobs.js'
import { createRunsRouter } from './routes/runs.js'
import { errorHandler } from './middleware/errorHandler.js'
import { gatewayTokenMiddleware } from './middleware/gatewayToken.js'
import { eventBus } from './services/eventBus.js'

const pkg = JSON.parse(fs.readFileSync(new URL('../../package.json', import.meta.url), 'utf8'))

Expand All @@ -32,6 +33,40 @@ export function createApp(scheduler) {
app.use('/api/jobs', createJobsRouter(scheduler))
app.use('/api/runs', createRunsRouter())

app.get('/api/events', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
res.flushHeaders()

function send(eventName, data) {
if (!res.writableEnded) {
res.write(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
}
}

const onStarted = (data) => send('run:started', data)
const onFinished = (data) => send('run:finished', data)

eventBus.on('run:started', onStarted)
eventBus.on('run:finished', onFinished)

let cleaned = false
function cleanup() {
if (cleaned) return
cleaned = true
eventBus.off('run:started', onStarted)
eventBus.off('run:finished', onFinished)
}

req.on('close', cleanup)
if (res.socket) {
res.socket.on('end', cleanup)
res.socket.on('close', cleanup)
}
})

app.get('/api/validate-path', (req, res) => {
const filePath = req.query.path || ''
if (!filePath.trim()) return res.json({ exists: false, isFile: false, executable: false })
Expand Down
3 changes: 2 additions & 1 deletion server/src/middleware/gatewayToken.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ export function gatewayTokenMiddleware(req, res, next) {
const token = process.env.GATEWAY_TOKEN
if (!token) return next()

const provided = req.headers['x-gateway-token']
// EventSource cannot set custom headers, so SSE clients send the token as ?token=...
const provided = req.headers['x-gateway-token'] || req.query.token || ''
if (!provided) {
return res.status(401).json({ error: 'Unauthorized: invalid or missing gateway token' })
}
Expand Down
6 changes: 4 additions & 2 deletions server/src/scheduler/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { spawn } from 'child_process'
import { getDb } from '../db/index.js'
import { send as ntfySend } from '../services/ntfy.js'
import { logger } from '../logger.js'
import { eventBus } from '../services/eventBus.js'

const MAX_OUTPUT_BYTES = 512 * 1024

Expand All @@ -21,6 +22,7 @@ export function run(job, triggeredBy = 'scheduler') {

const runId = runResult.lastInsertRowid
logger.info({ jobId: job.id, runId, triggeredBy }, `Job "${job.name}" started`)
eventBus.emit('run:started', { jobId: job.id, runId, triggeredBy })

const child = job.command_type === 'inline'
? spawn('/bin/sh', ['-c', job.command], { stdio: ['ignore', 'pipe', 'pipe'] })
Expand Down Expand Up @@ -67,20 +69,20 @@ export function run(job, triggeredBy = 'scheduler') {
WHERE id = ?
`).run(status, exitCode, stdout, stderr, duration, runId)

const maxRuns = Number(process.env.KEEP_MAX_FOR_HISTORY) || 5
db.prepare(`
DELETE FROM runs
WHERE job_id = ? AND id NOT IN (
SELECT id FROM runs WHERE job_id = ? ORDER BY id DESC LIMIT ?
)
`).run(job.id, job.id, maxRuns)
`).run(job.id, job.id, Number(process.env.KEEP_MAX_FOR_HISTORY) || 5)

if (status === 'error' && job.ntfy_enabled && job.ntfy_on_error) {
ntfySend(job, { status, exitCode, stderr }).catch(() => {})
} else if (status === 'success' && job.ntfy_enabled && job.ntfy_on_run) {
ntfySend(job, { status, exitCode }).catch(() => {})
}

eventBus.emit('run:finished', { jobId: job.id, runId, status, exitCode, duration_ms: duration })
logger.info({ jobId: job.id, runId, status, exitCode, duration }, `Job "${job.name}" finished`)
})

Expand Down
10 changes: 10 additions & 0 deletions server/src/services/eventBus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2026 by Christian Kellner.
* Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
*/

import { EventEmitter } from 'events'

export const eventBus = new EventEmitter()
eventBus.setMaxListeners(0)

84 changes: 84 additions & 0 deletions server/tests/events.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2026 by Christian Kellner.
* Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
*/

import { describe, it, expect, afterEach, beforeEach } from 'vitest'
import http from 'http'
import { makeApp } from './setup.js'
import { eventBus } from '../src/services/eventBus.js'

let server
let port

beforeEach(() => {
server = makeApp().listen(0)
port = server.address().port
})

afterEach(async () => {
await new Promise(r => server.close(r))
})

function connectSSE() {
return new Promise((resolve, reject) => {
const req = http.get(`http://localhost:${port}/api/events`, (res) => {
resolve({ req, res })
})
req.on('error', (e) => { if (e.code !== 'ECONNRESET') reject(e) })
setTimeout(() => reject(new Error('SSE connect timeout')), 2000)
})
}

function collectData(res, count = 1, timeoutMs = 2000) {
return new Promise((resolve, reject) => {
const chunks = []
const timer = setTimeout(() => reject(new Error('collectData timeout')), timeoutMs)
res.on('data', (chunk) => {
chunks.push(chunk.toString())
if (chunks.length >= count) { clearTimeout(timer); resolve(chunks.join('')) }
})
})
}

describe('GET /api/events', () => {
it('responds with SSE headers', async () => {
const { req, res } = await connectSSE()
expect(res.headers['content-type']).toBe('text/event-stream')
expect(res.headers['cache-control']).toBe('no-cache')
req.destroy()
})

it('sends run:started event when bus emits', async () => {
const { req, res } = await connectSSE()
const dataPromise = collectData(res)
await new Promise(r => setTimeout(r, 30))
eventBus.emit('run:started', { jobId: 1, runId: 42, triggeredBy: 'manual' })
const raw = await dataPromise
expect(raw).toContain('event: run:started')
expect(raw).toContain('"jobId":1')
expect(raw).toContain('"runId":42')
req.destroy()
})

it('sends run:finished event when bus emits', async () => {
const { req, res } = await connectSSE()
const dataPromise = collectData(res)
await new Promise(r => setTimeout(r, 30))
eventBus.emit('run:finished', { jobId: 2, runId: 7, status: 'success', exitCode: 0, duration_ms: 500 })
const raw = await dataPromise
expect(raw).toContain('event: run:finished')
expect(raw).toContain('"status":"success"')
req.destroy()
})

it('removes bus listeners when client disconnects', async () => {
const listenersBefore = eventBus.listenerCount('run:started')
const { req, res } = await connectSSE()
expect(eventBus.listenerCount('run:started')).toBe(listenersBefore + 1)
req.destroy()
res.resume()
await new Promise(r => setTimeout(r, 50))
expect(eventBus.listenerCount('run:started')).toBe(listenersBefore)
})
})
Loading
Loading