Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { db } from '@sim/db'
import { asyncJobs, db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt, sql } from 'drizzle-orm'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'

const logger = createLogger('CleanupStaleExecutions')
Expand Down Expand Up @@ -80,12 +81,74 @@ export async function GET(request: NextRequest) {

logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)

// Clean up stale async jobs (stuck in processing)
let asyncJobsMarkedFailed = 0

try {
const staleAsyncJobs = await db
.update(asyncJobs)
.set({
status: JOB_STATUS.FAILED,
completedAt: new Date(),
error: `Job terminated: stuck in processing for more than ${STALE_THRESHOLD_MINUTES} minutes`,
updatedAt: new Date(),
})
.where(
and(eq(asyncJobs.status, JOB_STATUS.PROCESSING), lt(asyncJobs.startedAt, staleThreshold))
)
.returning({ id: asyncJobs.id })

asyncJobsMarkedFailed = staleAsyncJobs.length
if (asyncJobsMarkedFailed > 0) {
logger.info(`Marked ${asyncJobsMarkedFailed} stale async jobs as failed`)
}
} catch (error) {
logger.error('Failed to clean up stale async jobs:', {
error: error instanceof Error ? error.message : String(error),
})
}

// Delete completed/failed jobs older than retention period
const retentionThreshold = new Date(Date.now() - JOB_RETENTION_HOURS * 60 * 60 * 1000)
let asyncJobsDeleted = 0

try {
const deletedJobs = await db
.delete(asyncJobs)
.where(
and(
inArray(asyncJobs.status, [JOB_STATUS.COMPLETED, JOB_STATUS.FAILED]),
lt(asyncJobs.completedAt, retentionThreshold)
)
)
.returning({ id: asyncJobs.id })

asyncJobsDeleted = deletedJobs.length
if (asyncJobsDeleted > 0) {
logger.info(
`Deleted ${asyncJobsDeleted} old async jobs (retention: ${JOB_RETENTION_HOURS}h)`
)
}
} catch (error) {
logger.error('Failed to delete old async jobs:', {
error: error instanceof Error ? error.message : String(error),
})
}
Comment thread
waleedlatif1 marked this conversation as resolved.

return NextResponse.json({
success: true,
found: staleExecutions.length,
cleaned,
failed,
thresholdMinutes: STALE_THRESHOLD_MINUTES,
executions: {
found: staleExecutions.length,
cleaned,
failed,
thresholdMinutes: STALE_THRESHOLD_MINUTES,
},
asyncJobs: {
staleMarkedFailed: asyncJobsMarkedFailed,
oldDeleted: asyncJobsDeleted,
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
retentionHours: JOB_RETENTION_HOURS,
},
})
} catch (error) {
logger.error('Error in stale execution cleanup job:', error)
Expand Down
86 changes: 34 additions & 52 deletions apps/sim/app/api/jobs/[jobId]/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createLogger } from '@sim/logger'
import { runs } from '@trigger.dev/sdk'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getJobQueue, JOB_STATUS } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { createErrorResponse } from '@/app/api/workflows/utils'

Expand All @@ -15,8 +15,6 @@ export async function GET(
const requestId = generateRequestId()

try {
logger.debug(`[${requestId}] Getting status for task: ${taskId}`)

const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
logger.warn(`[${requestId}] Unauthorized task status request`)
Expand All @@ -25,76 +23,60 @@ export async function GET(

const authenticatedUserId = authResult.userId

const run = await runs.retrieve(taskId)
const jobQueue = await getJobQueue()
const job = await jobQueue.getJob(taskId)

logger.debug(`[${requestId}] Task ${taskId} status: ${run.status}`)
if (!job) {
return createErrorResponse('Task not found', 404)
}

const payload = run.payload as any
if (payload?.workflowId) {
if (job.metadata?.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const accessCheck = await verifyWorkflowAccess(authenticatedUserId, payload.workflowId)
const accessCheck = await verifyWorkflowAccess(
authenticatedUserId,
job.metadata.workflowId as string
)
if (!accessCheck.hasAccess) {
logger.warn(`[${requestId}] User ${authenticatedUserId} denied access to task ${taskId}`, {
workflowId: payload.workflowId,
})
return createErrorResponse('Access denied', 403)
}
logger.debug(`[${requestId}] User ${authenticatedUserId} has access to task ${taskId}`)
} else {
if (payload?.userId && payload.userId !== authenticatedUserId) {
logger.warn(
`[${requestId}] User ${authenticatedUserId} attempted to access task ${taskId} owned by ${payload.userId}`
)
return createErrorResponse('Access denied', 403)
}
if (!payload?.userId) {
logger.warn(
`[${requestId}] Task ${taskId} has no ownership information in payload. Denying access for security.`
)
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
return createErrorResponse('Access denied', 403)
}
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
return createErrorResponse('Access denied', 403)
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
return createErrorResponse('Access denied', 403)
}

const statusMap = {
QUEUED: 'queued',
WAITING_FOR_DEPLOY: 'queued',
EXECUTING: 'processing',
RESCHEDULED: 'processing',
FROZEN: 'processing',
COMPLETED: 'completed',
CANCELED: 'cancelled',
FAILED: 'failed',
CRASHED: 'failed',
INTERRUPTED: 'failed',
SYSTEM_FAILURE: 'failed',
EXPIRED: 'failed',
} as const

const mappedStatus = statusMap[run.status as keyof typeof statusMap] || 'unknown'
const mappedStatus = job.status === JOB_STATUS.PENDING ? 'queued' : job.status

const response: any = {
success: true,
taskId,
status: mappedStatus,
metadata: {
startedAt: run.startedAt,
startedAt: job.startedAt,
},
}

if (mappedStatus === 'completed') {
response.output = run.output // This contains the workflow execution results
response.metadata.completedAt = run.finishedAt
response.metadata.duration = run.durationMs
if (job.status === JOB_STATUS.COMPLETED) {
response.output = job.output
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}

if (mappedStatus === 'failed') {
response.error = run.error
response.metadata.completedAt = run.finishedAt
response.metadata.duration = run.durationMs
if (job.status === JOB_STATUS.FAILED) {
response.error = job.error
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}

if (mappedStatus === 'processing' || mappedStatus === 'queued') {
response.estimatedDuration = 180000 // 3 minutes max from our config
if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
response.estimatedDuration = 180000
}

return NextResponse.json(response)
Expand Down
110 changes: 47 additions & 63 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { executeScheduleJob } from '@/background/schedule-execution'

Expand Down Expand Up @@ -55,72 +54,57 @@ export async function GET(request: NextRequest) {
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)

if (isTriggerDevEnabled) {
const triggerPromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt

try {
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}

const handle = await tasks.trigger('schedule-execution', payload)
logger.info(
`[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}`
)
return handle
} catch (error) {
logger.error(
`[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`,
error
)
return null
}
})

await Promise.allSettled(triggerPromises)

logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
} else {
const directExecutionPromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt

const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}

void executeScheduleJob(payload).catch((error) => {
logger.error(
`[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`,
error
)
const jobQueue = await getJobQueue()

const queuePromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt

const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}

try {
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
metadata: { workflowId: schedule.workflowId },
})

logger.info(
`[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)`
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
)
})

await Promise.allSettled(directExecutionPromises)
if (shouldExecuteInline()) {
void (async () => {
try {
await jobQueue.startJob(jobId)
const output = await executeScheduleJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(
`[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`,
{ jobId, error: errorMessage }
)
await jobQueue.markJobFailed(jobId, errorMessage)
}
})()
}
} catch (error) {
logger.error(
`[${requestId}] Failed to queue schedule execution for workflow ${schedule.workflowId}`,
error
)
}
})

await Promise.allSettled(queuePromises)

logger.info(
`[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)`
)
}
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions`)

return NextResponse.json({
message: 'Scheduled workflow executions processed',
Expand Down
Loading