Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,86 @@
# execution_scripts
# execution_scripts

Shell scripts for submitting and managing Spark workloads across multiple execution engines.

## Supported Engines

| Engine | Directory | Description |
|--------|-----------|-------------|
| **Databricks** | [`databricks/`](databricks/) | Trigger existing jobs or run notebooks via the Databricks Jobs API 2.1 |
| **Spark Standalone** | [`spark-standalone/`](spark-standalone/) | Start/stop a local Spark cluster and submit jobs via `spark-submit` |
| **Google Cloud Dataproc** | [`dataproc/`](dataproc/) | Create/delete Dataproc clusters and submit Spark/PySpark/Hadoop jobs via `gcloud` |

## Quick Start

### Databricks

```bash
export DATABRICKS_HOST="https://<workspace>.azuredatabricks.net"
export DATABRICKS_TOKEN="dapi..."

# Trigger an existing job and wait for completion
./databricks/run_job.sh --job-id 42 --wait

# Run a notebook once on an existing cluster
./databricks/submit_notebook.sh \
--notebook-path /Shared/etl/my_notebook \
--cluster-id <cluster-id> \
--wait
```

### Spark Standalone

```bash
export SPARK_HOME=/opt/spark

# Start master + 2 workers
./spark-standalone/start_cluster.sh --workers 2 --worker-cores 4 --worker-memory 8g

# Submit a PySpark job
./spark-standalone/submit_job.sh --app /path/to/etl.py

# Stop the cluster
./spark-standalone/stop_cluster.sh
```

### Google Cloud Dataproc

```bash
export GCP_PROJECT="my-gcp-project"
export GCP_REGION="us-central1"

# Create a cluster
./dataproc/create_cluster.sh --cluster-name my-cluster --num-workers 4

# Submit a PySpark job
./dataproc/submit_job.sh \
--cluster-name my-cluster \
--job-type pyspark \
--app gs://my-bucket/etl.py \
-- --date 2024-01-01

# Delete the cluster
./dataproc/delete_cluster.sh --cluster-name my-cluster --yes
```

## Repository Structure

```
execution_scripts/
├── databricks/
│ ├── README.md
│ ├── run_job.sh # Trigger an existing Databricks job
│ └── submit_notebook.sh # Run a notebook as a one-time run
├── spark-standalone/
│ ├── README.md
│ ├── start_cluster.sh # Start Spark master + workers
│ ├── stop_cluster.sh # Stop the cluster
│ └── submit_job.sh # Submit a job via spark-submit
└── dataproc/
├── README.md
├── create_cluster.sh # Create a Dataproc cluster
├── delete_cluster.sh # Delete a Dataproc cluster
└── submit_job.sh # Submit a job to Dataproc
```

For engine-specific usage and options, refer to the README in each subdirectory.
87 changes: 87 additions & 0 deletions databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Databricks Execution Scripts

Shell scripts to interact with Databricks workspaces via the **Jobs API 2.1** and the `runs/submit` endpoint.

## Prerequisites

| Tool | Purpose |
|------|---------|
| `curl` | HTTP calls to the Databricks REST API |
| `jq` | JSON parsing / construction |
| `python3` | URL-encoding job names in `run_job.sh` |

Set the following environment variables before running any script:

```bash
export DATABRICKS_HOST="https://<workspace>.azuredatabricks.net"
export DATABRICKS_TOKEN="dapi..." # personal-access token or SP OAuth token
```

---

## Scripts

### `run_job.sh` – Trigger an existing job

Triggers a run for an existing Databricks job (by ID or name) and optionally waits for completion.

```
Usage: ./run_job.sh [OPTIONS]

Options:
-j, --job-id <id> Existing job ID to trigger
-n, --job-name <name> Look up job ID by name
-p, --params <json> Notebook/task parameters as a JSON string
-w, --wait Block until the run finishes
-t, --timeout <seconds> Polling timeout (default: 3600)
-h, --help Show help
```

**Examples:**

```bash
# Trigger job by ID and wait
./run_job.sh --job-id 42 --wait

# Trigger job by name with parameters
./run_job.sh --job-name "nightly_etl" \
--params '{"date":"2024-01-01","env":"prod"}' \
--wait
```

---

### `submit_notebook.sh` – Run a notebook as a one-time run

Submits a notebook to run immediately on an existing or a new ephemeral cluster.

```
Usage: ./submit_notebook.sh [OPTIONS]

Options:
-n, --notebook-path <path> Workspace path to the notebook (required)
-c, --cluster-id <id> Existing cluster ID
--new-cluster <json> New-cluster specification JSON
-p, --params <json> Notebook base parameters
-r, --run-name <name> Human-readable run name
-w, --wait Block until the run finishes
-t, --timeout <seconds> Polling timeout (default: 3600)
-h, --help Show help
```

**Examples:**

```bash
# Run a notebook on an existing cluster
./submit_notebook.sh \
--notebook-path /Shared/etl/transform \
--cluster-id 1234-567890-abc12345 \
--params '{"date":"2024-01-01"}' \
--wait

# Run a notebook on a new ephemeral cluster
./submit_notebook.sh \
--notebook-path /Shared/etl/transform \
--new-cluster '{"num_workers":4,"spark_version":"14.3.x-scala2.12","node_type_id":"m5d.xlarge"}' \
--wait
```
130 changes: 130 additions & 0 deletions databricks/run_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#!/usr/bin/env bash
# run_job.sh – Submit and optionally wait for a Databricks job run.
#
# Usage:
# ./run_job.sh [OPTIONS]
#
# Required environment variables (or pass via flags):
# DATABRICKS_HOST – Workspace URL, e.g. https://<workspace>.azuredatabricks.net
# DATABRICKS_TOKEN – Personal-access token (or SP OAuth token)
#
# Options:
# -j, --job-id <id> Existing job ID to trigger a run
# -n, --job-name <name> Look up job ID by name (mutually exclusive with -j)
# -p, --params <json> Notebook/task parameters as a JSON string (optional)
# -w, --wait Block until the run finishes and exit with its result code
# -t, --timeout <seconds> Polling timeout when --wait is set (default: 3600)
# -h, --help Show this help message
#
# Examples:
# DATABRICKS_HOST=https://adb-xxx.azuredatabricks.net \
# DATABRICKS_TOKEN=dapi... \
# ./run_job.sh --job-id 42 --wait
#
# ./run_job.sh --job-name "nightly_etl" --params '{"date":"2024-01-01"}' --wait

set -euo pipefail

# ── defaults ───────────────────────────────────────────────────────────────────
WAIT=false
TIMEOUT=3600
POLL_INTERVAL=15
JOB_ID=""
JOB_NAME=""
PARAMS=""

# ── helpers ────────────────────────────────────────────────────────────────────
usage() {
grep '^#' "$0" | sed 's/^# \{0,1\}//'
exit 0
}

die() { echo "ERROR: $*" >&2; exit 1; }

require_cmd() { command -v "$1" >/dev/null 2>&1 || die "'$1' is required but not installed."; }

urlencode() { python3 -c "import urllib.parse,sys; print(urllib.parse.quote(sys.argv[1]))" "$1"; }

api_call() {
local method="$1" path="$2" data="${3:-}"
local url="${DATABRICKS_HOST%/}/api/2.1${path}"
if [[ -n "$data" ]]; then
curl -sSf -X "$method" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
-H "Content-Type: application/json" \
-d "$data" "$url"
else
curl -sSf -X "$method" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
"$url"
fi
}

# ── argument parsing ───────────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case "$1" in
-j|--job-id) JOB_ID="$2"; shift 2 ;;
-n|--job-name) JOB_NAME="$2"; shift 2 ;;
-p|--params) PARAMS="$2"; shift 2 ;;
-w|--wait) WAIT=true; shift ;;
-t|--timeout) TIMEOUT="$2"; shift 2 ;;
-h|--help) usage ;;
*) die "Unknown option: $1" ;;
esac
done

# ── validation ─────────────────────────────────────────────────────────────────
require_cmd curl
require_cmd jq
require_cmd python3

[[ -z "${DATABRICKS_HOST:-}" ]] && die "DATABRICKS_HOST is not set."
[[ -z "${DATABRICKS_TOKEN:-}" ]] && die "DATABRICKS_TOKEN is not set."
[[ -z "$JOB_ID" && -z "$JOB_NAME" ]] && die "Provide --job-id or --job-name."
[[ -n "$JOB_ID" && -n "$JOB_NAME" ]] && die "--job-id and --job-name are mutually exclusive."

# ── resolve job name → job ID ──────────────────────────────────────────────────
if [[ -n "$JOB_NAME" ]]; then
echo "Looking up job ID for name: '${JOB_NAME}' …"
JOBS_JSON=$(api_call GET "/jobs/list?name=$(urlencode "$JOB_NAME")")
JOB_ID=$(echo "$JOBS_JSON" | jq -r '.jobs[0].job_id // empty')
[[ -z "$JOB_ID" ]] && die "No job found with name '${JOB_NAME}'."
echo "Resolved job ID: ${JOB_ID}"
fi

# ── trigger run ────────────────────────────────────────────────────────────────
PAYLOAD="{\"job_id\": ${JOB_ID}}"
if [[ -n "$PARAMS" ]]; then
PAYLOAD=$(echo "$PAYLOAD" | jq --argjson p "$PARAMS" '. + {notebook_params: $p}')
fi

echo "Triggering run for job ID ${JOB_ID} …"
RUN_RESPONSE=$(api_call POST "/jobs/run-now" "$PAYLOAD")
RUN_ID=$(echo "$RUN_RESPONSE" | jq -r '.run_id')
echo "Run ID: ${RUN_ID}"
echo "Run URL: ${DATABRICKS_HOST%/}/#job/${JOB_ID}/run/${RUN_ID}"

# ── optional wait ──────────────────────────────────────────────────────────────
if [[ "$WAIT" == "true" ]]; then
echo "Waiting for run ${RUN_ID} to complete (timeout: ${TIMEOUT}s) …"
ELAPSED=0
while true; do
RUN_STATE=$(api_call GET "/jobs/runs/get?run_id=${RUN_ID}" | jq -r '.state')
LIFE_CYCLE=$(echo "$RUN_STATE" | jq -r '.life_cycle_state')
RESULT_STATE=$(echo "$RUN_STATE" | jq -r '.result_state // "N/A"')
echo " [${ELAPSED}s] lifecycle=${LIFE_CYCLE} result=${RESULT_STATE}"

if [[ "$LIFE_CYCLE" == "TERMINATED" || "$LIFE_CYCLE" == "SKIPPED" || "$LIFE_CYCLE" == "INTERNAL_ERROR" ]]; then
echo "Run finished with result state: ${RESULT_STATE}"
[[ "$RESULT_STATE" == "SUCCESS" ]] && exit 0
exit 1
fi

if [[ "$ELAPSED" -ge "$TIMEOUT" ]]; then
die "Timed out after ${TIMEOUT}s waiting for run ${RUN_ID}."
fi

sleep "$POLL_INTERVAL"
ELAPSED=$(( ELAPSED + POLL_INTERVAL ))
done
fi
Loading