Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ __pycache__/
*.dot
.pyre
*et_def.pb.cc
*et_def.pb.h
*et_def.pb.h

trace_collection/post_execution/traces

79 changes: 79 additions & 0 deletions trace_collection/post_execution/combine_trace.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
set -ex pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TRACE_DIR="${TRACE_DIR:-${SCRIPT_DIR}/traces}"

# ---------------------------------------------------------------------------
# Validate inputs
# ---------------------------------------------------------------------------
if [[ ! -d "${TRACE_DIR}" ]]; then
echo "[ERROR] Trace directory not found: ${TRACE_DIR}"
exit 1
fi

# Automatically detect number of ranks from host*.json files
NUM_RANKS=$(ls "${TRACE_DIR}"/host*.json 2>/dev/null | wc -l)
if [[ "${NUM_RANKS}" -eq 0 ]]; then
echo "[ERROR] No host_*.json files found in ${TRACE_DIR}"
exit 1
fi
echo "[INFO] Found ${NUM_RANKS} rank(s) in ${TRACE_DIR}"

# ---------------------------------------------------------------------------
# Step 1: chakra_trace_link (host + device → linked JSON in TRACE_DIR)
# ---------------------------------------------------------------------------
echo ""
echo "=== Step 1: chakra_trace_link ==="
for ((rank=0; rank<NUM_RANKS; rank++)); do
HOST_TRACE="${TRACE_DIR}/host.${rank}.json"
DEVICE_TRACE="${TRACE_DIR}/device.${rank}.json"
LINKED_OUT="${TRACE_DIR}/linked.${rank}.json"

echo "[rank ${rank}] Linking ${HOST_TRACE} + ${DEVICE_TRACE} -> ${LINKED_OUT}"
chakra_trace_link \
--chakra-host-trace "${HOST_TRACE}" \
--chakra-device-trace "${DEVICE_TRACE}" \
--rank "${rank}" \
--output-file "${LINKED_OUT}"
done
echo "[INFO] All ranks linked."

# ---------------------------------------------------------------------------
# Step 2: chakra_converter (linked JSON → protobuf .et in TRACE_DIR)
# ASTRA-sim expects files named {prefix}.{npu_id}.et
# e.g. chakra_trace.0.et, chakra_trace.1.et, ...
# ---------------------------------------------------------------------------
echo ""
echo "=== Step 2: chakra_converter ==="
for ((rank=0; rank<NUM_RANKS; rank++)); do
LINKED_IN="${TRACE_DIR}/linked.${rank}.json"
ET_OUT="${TRACE_DIR}/chakra_trace.${rank}.et"

echo "[rank ${rank}] Converting ${LINKED_IN} -> ${ET_OUT}"
chakra_converter --log-filename /dev/null \
PyTorch \
--input "${LINKED_IN}" \
--output "${ET_OUT}"
done
echo "[INFO] All ranks converted."

# ---------------------------------------------------------------------------
# Step 3: chakra_jsonizer (protobuf .et to JSON in TRACE_DIR)
# ---------------------------------------------------------------------------
echo ""
echo "=== Step 3: chakra_jsonizer ==="
for ((rank=0; rank<NUM_RANKS; rank++)); do
ET_OUT="${TRACE_DIR}/chakra_trace.${rank}.et"
JSON_OUT="${TRACE_DIR}/chakra_trace.${rank}.json"

echo "[rank ${rank}] Converting ${ET_OUT} -> ${JSON_OUT}"
chakra_jsonizer \
--input "${ET_OUT}" \
--output "${JSON_OUT}"
done
echo "[INFO] All ranks converted."

echo ""
echo "=== Done ==="
echo " Files: chakra_trace.0.et ... chakra_trace.$((NUM_RANKS-1)).et"
echo ""
37 changes: 37 additions & 0 deletions trace_collection/post_execution/sample_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Simple neural network model for post-execution trace collection."""

import torch
import torch.nn as nn


class SampleModel(nn.Module):
"""A simple neural network with linear layer and ReLU activation."""

def __init__(self, input_size: int = 1024, hidden_size: int = 256 * 1024, output_size: int = 256) -> None:
"""
Initialize the model.

Args:
input_size: Size of input features
hidden_size: Size of hidden layer
output_size: Size of output layer
"""
super().__init__()
self.linear = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, output_size)
self.relu = nn.ReLU()

def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass through the model.

Args:
x: Input tensor

Returns:
Output tensor after linear layer and ReLU
"""
x = self.linear(x)
x = self.relu(x)
x = self.linear2(x)
return x
118 changes: 118 additions & 0 deletions trace_collection/post_execution/simple_multirank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os

import torch
import torch.distributed as dist
import torch.nn as nn
from sample_model import SampleModel
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.profiler import ExecutionTraceObserver, profile

"""Simple script to collect post-execution traces with multiple ranks via torchrun."""

"""
# When running across multiple nodes
mpirun -np 4 -N 2 \
torchrun \
--nnodes=2 \
--nproc_per_node=2 \
--rdzv_backend=c10d \
rdzv_endpoint=<node0-ip>:29501 \
--simple_multirank.py
"""
"""OR"""
"""
# When running on one node
torchrun \
--nproc_per_node=2 \
--simple_multirank.py
"""

def main() -> None:
"""Run 10 iterations of forward and backward passes."""
batch_size = 64
input_size = 1024
output_size = 256
output_path = "traces"

# torchrun provides these environment variables for each process.
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
if torch.cuda.is_available():
backend = "nccl"
torch.cuda.set_device(local_rank)
device = f"cuda:{local_rank}"
else:
backend = "gloo"
device = "cpu"

dist.init_process_group(backend=backend)
rank = dist.get_rank()
world_size = dist.get_world_size()

model = SampleModel()
criterion = nn.MSELoss()
model.to(device)
if torch.cuda.is_available(): # noqa: SIM108
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
else:
model = DDP(model)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

# Initializing Host profiler and Kineto profiler
wait_iters = 0
warmup_iters = 5
active_iters = 5
total_steps = wait_iters + warmup_iters + active_iters

os.makedirs(output_path, exist_ok=True)

et = ExecutionTraceObserver()
et.register_callback(f"{output_path}/host.{rank}.json")

def device_trace_handler(prof):
prof.export_chrome_trace(f"{output_path}/device.{rank}.json")

activities = [torch.profiler.ProfilerActivity.CPU]
if "cuda" in device:
activities.append(torch.profiler.ProfilerActivity.CUDA)

with profile(
activities=activities,
schedule=torch.profiler.schedule(wait=wait_iters, warmup=warmup_iters, active=active_iters),
on_trace_ready=device_trace_handler,
record_shapes=True,
execution_trace_observer=et,
) as prof:
print(f"Starting training loop on rank {rank}/{world_size} using {device}...")
for step_id in range(total_steps):
# Generate random input data
x = torch.randn(batch_size, input_size, device=device)
target = torch.randn(batch_size, output_size, device=device)

# Forward pass
output = model(x)

# Compute loss
loss = criterion(output, target)

# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
if "cuda" in device:
# Ensure all CUDA operations are complete before moving to the next step.
torch.cuda.synchronize()

print(f"Rank {rank} Iteration {step_id + 1}/{total_steps}, Loss: {loss.item():.4f}")

# Mark the end of a step
prof.step()

et.stop()
et.unregister_callback()
dist.barrier()
dist.destroy_process_group()
print(f"Rank {rank} training complete!")


if __name__ == "__main__":
main()
84 changes: 84 additions & 0 deletions trace_collection/post_execution/simple_onerank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import os

import torch
import torch.nn as nn
from sample_model import SampleModel
from torch.profiler import ExecutionTraceObserver, profile

"""Simple script to collect post-execution traces with one rank only(=compute only)."""

"""
python simple_onerank.py
"""

def main() -> None:
"""Run 10 iterations of forward and backward passes."""
batch_size = 64
input_size = 1024
output_size = 256
output_path = "traces"
device = "cuda" if torch.cuda.is_available() else "cpu"

model = SampleModel()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
model.to(device)

# Initializing Host profiler and Kineto profiler
rank = 0 # Single rank for compute-only
wait_iters = 0
warmup_iters = 5
active_iters = 5
total_steps = wait_iters + warmup_iters + active_iters

os.makedirs(output_path, exist_ok=True)

et = ExecutionTraceObserver()
et.register_callback(f"{output_path}/host.{rank}.json")

def device_trace_handler(prof):
prof.export_chrome_trace(f"{output_path}/device.{rank}.json")

activities = [torch.profiler.ProfilerActivity.CPU]
if "cuda" in device:
activities.append(torch.profiler.ProfilerActivity.CUDA)

with profile(
activities=activities,
schedule=torch.profiler.schedule(wait=wait_iters, warmup=warmup_iters, active=active_iters),
on_trace_ready=device_trace_handler,
record_shapes=True,
execution_trace_observer=et,
) as prof:
print("Starting training loop...")
for step_id in range(total_steps):
# Generate random input data
x = torch.randn(batch_size, input_size, device=device)
target = torch.randn(batch_size, output_size, device=device)

# Forward pass
output = model(x)

# Compute loss
loss = criterion(output, target)

# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
if "cuda" in device:
# Ensure all CUDA operations are complete before moving to the next step.
torch.cuda.synchronize()

print(f"Iteration {step_id + 1}/{total_steps}, Loss: {loss.item():.4f}")

# Mark the end of a step
prof.step()

et.stop()
et.unregister_callback()
print("Training complete!")


if __name__ == "__main__":
main()
Loading