diff --git a/.gitignore b/.gitignore index 8be933eb..6a4cb96b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,7 @@ __pycache__/ *.dot .pyre *et_def.pb.cc -*et_def.pb.h \ No newline at end of file +*et_def.pb.h + +trace_collection/post_execution/traces + diff --git a/trace_collection/post_execution/combine_trace.sh b/trace_collection/post_execution/combine_trace.sh new file mode 100644 index 00000000..caf0e755 --- /dev/null +++ b/trace_collection/post_execution/combine_trace.sh @@ -0,0 +1,79 @@ +set -ex pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TRACE_DIR="${TRACE_DIR:-${SCRIPT_DIR}/traces}" + +# --------------------------------------------------------------------------- +# Validate inputs +# --------------------------------------------------------------------------- +if [[ ! -d "${TRACE_DIR}" ]]; then + echo "[ERROR] Trace directory not found: ${TRACE_DIR}" + exit 1 +fi + +# Automatically detect number of ranks from host*.json files +NUM_RANKS=$(ls "${TRACE_DIR}"/host*.json 2>/dev/null | wc -l) +if [[ "${NUM_RANKS}" -eq 0 ]]; then + echo "[ERROR] No host_*.json files found in ${TRACE_DIR}" + exit 1 +fi +echo "[INFO] Found ${NUM_RANKS} rank(s) in ${TRACE_DIR}" + +# --------------------------------------------------------------------------- +# Step 1: chakra_trace_link (host + device → linked JSON in TRACE_DIR) +# --------------------------------------------------------------------------- +echo "" +echo "=== Step 1: chakra_trace_link ===" +for ((rank=0; rank ${LINKED_OUT}" + chakra_trace_link \ + --chakra-host-trace "${HOST_TRACE}" \ + --chakra-device-trace "${DEVICE_TRACE}" \ + --rank "${rank}" \ + --output-file "${LINKED_OUT}" +done +echo "[INFO] All ranks linked." + +# --------------------------------------------------------------------------- +# Step 2: chakra_converter (linked JSON → protobuf .et in TRACE_DIR) +# ASTRA-sim expects files named {prefix}.{npu_id}.et +# e.g. chakra_trace.0.et, chakra_trace.1.et, ... +# --------------------------------------------------------------------------- +echo "" +echo "=== Step 2: chakra_converter ===" +for ((rank=0; rank ${ET_OUT}" + chakra_converter --log-filename /dev/null \ + PyTorch \ + --input "${LINKED_IN}" \ + --output "${ET_OUT}" +done +echo "[INFO] All ranks converted." + +# --------------------------------------------------------------------------- +# Step 3: chakra_jsonizer (protobuf .et to JSON in TRACE_DIR) +# --------------------------------------------------------------------------- +echo "" +echo "=== Step 3: chakra_jsonizer ===" +for ((rank=0; rank ${JSON_OUT}" + chakra_jsonizer \ + --input "${ET_OUT}" \ + --output "${JSON_OUT}" +done +echo "[INFO] All ranks converted." + +echo "" +echo "=== Done ===" +echo " Files: chakra_trace.0.et ... chakra_trace.$((NUM_RANKS-1)).et" +echo "" \ No newline at end of file diff --git a/trace_collection/post_execution/sample_model.py b/trace_collection/post_execution/sample_model.py new file mode 100644 index 00000000..47dc60ef --- /dev/null +++ b/trace_collection/post_execution/sample_model.py @@ -0,0 +1,37 @@ +"""Simple neural network model for post-execution trace collection.""" + +import torch +import torch.nn as nn + + +class SampleModel(nn.Module): + """A simple neural network with linear layer and ReLU activation.""" + + def __init__(self, input_size: int = 1024, hidden_size: int = 256 * 1024, output_size: int = 256) -> None: + """ + Initialize the model. + + Args: + input_size: Size of input features + hidden_size: Size of hidden layer + output_size: Size of output layer + """ + super().__init__() + self.linear = nn.Linear(input_size, hidden_size) + self.linear2 = nn.Linear(hidden_size, output_size) + self.relu = nn.ReLU() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """ + Forward pass through the model. + + Args: + x: Input tensor + + Returns: + Output tensor after linear layer and ReLU + """ + x = self.linear(x) + x = self.relu(x) + x = self.linear2(x) + return x diff --git a/trace_collection/post_execution/simple_multirank.py b/trace_collection/post_execution/simple_multirank.py new file mode 100644 index 00000000..c175b8ae --- /dev/null +++ b/trace_collection/post_execution/simple_multirank.py @@ -0,0 +1,118 @@ +import os + +import torch +import torch.distributed as dist +import torch.nn as nn +from sample_model import SampleModel +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.profiler import ExecutionTraceObserver, profile + +"""Simple script to collect post-execution traces with multiple ranks via torchrun.""" + +""" + # When running across multiple nodes + mpirun -np 4 -N 2 \ + torchrun \ + --nnodes=2 \ + --nproc_per_node=2 \ + --rdzv_backend=c10d \ + rdzv_endpoint=:29501 \ + --simple_multirank.py +""" +"""OR""" +""" + # When running on one node + torchrun \ + --nproc_per_node=2 \ + --simple_multirank.py +""" + +def main() -> None: + """Run 10 iterations of forward and backward passes.""" + batch_size = 64 + input_size = 1024 + output_size = 256 + output_path = "traces" + + # torchrun provides these environment variables for each process. + local_rank = int(os.environ.get("LOCAL_RANK", "0")) + if torch.cuda.is_available(): + backend = "nccl" + torch.cuda.set_device(local_rank) + device = f"cuda:{local_rank}" + else: + backend = "gloo" + device = "cpu" + + dist.init_process_group(backend=backend) + rank = dist.get_rank() + world_size = dist.get_world_size() + + model = SampleModel() + criterion = nn.MSELoss() + model.to(device) + if torch.cuda.is_available(): # noqa: SIM108 + model = DDP(model, device_ids=[local_rank], output_device=local_rank) + else: + model = DDP(model) + optimizer = torch.optim.SGD(model.parameters(), lr=0.01) + + # Initializing Host profiler and Kineto profiler + wait_iters = 0 + warmup_iters = 5 + active_iters = 5 + total_steps = wait_iters + warmup_iters + active_iters + + os.makedirs(output_path, exist_ok=True) + + et = ExecutionTraceObserver() + et.register_callback(f"{output_path}/host.{rank}.json") + + def device_trace_handler(prof): + prof.export_chrome_trace(f"{output_path}/device.{rank}.json") + + activities = [torch.profiler.ProfilerActivity.CPU] + if "cuda" in device: + activities.append(torch.profiler.ProfilerActivity.CUDA) + + with profile( + activities=activities, + schedule=torch.profiler.schedule(wait=wait_iters, warmup=warmup_iters, active=active_iters), + on_trace_ready=device_trace_handler, + record_shapes=True, + execution_trace_observer=et, + ) as prof: + print(f"Starting training loop on rank {rank}/{world_size} using {device}...") + for step_id in range(total_steps): + # Generate random input data + x = torch.randn(batch_size, input_size, device=device) + target = torch.randn(batch_size, output_size, device=device) + + # Forward pass + output = model(x) + + # Compute loss + loss = criterion(output, target) + + # Backward pass + optimizer.zero_grad() + loss.backward() + optimizer.step() + if "cuda" in device: + # Ensure all CUDA operations are complete before moving to the next step. + torch.cuda.synchronize() + + print(f"Rank {rank} Iteration {step_id + 1}/{total_steps}, Loss: {loss.item():.4f}") + + # Mark the end of a step + prof.step() + + et.stop() + et.unregister_callback() + dist.barrier() + dist.destroy_process_group() + print(f"Rank {rank} training complete!") + + +if __name__ == "__main__": + main() diff --git a/trace_collection/post_execution/simple_onerank.py b/trace_collection/post_execution/simple_onerank.py new file mode 100644 index 00000000..cbf8b948 --- /dev/null +++ b/trace_collection/post_execution/simple_onerank.py @@ -0,0 +1,84 @@ +import os + +import torch +import torch.nn as nn +from sample_model import SampleModel +from torch.profiler import ExecutionTraceObserver, profile + +"""Simple script to collect post-execution traces with one rank only(=compute only).""" + +""" + python simple_onerank.py +""" + +def main() -> None: + """Run 10 iterations of forward and backward passes.""" + batch_size = 64 + input_size = 1024 + output_size = 256 + output_path = "traces" + device = "cuda" if torch.cuda.is_available() else "cpu" + + model = SampleModel() + optimizer = torch.optim.SGD(model.parameters(), lr=0.01) + criterion = nn.MSELoss() + model.to(device) + + # Initializing Host profiler and Kineto profiler + rank = 0 # Single rank for compute-only + wait_iters = 0 + warmup_iters = 5 + active_iters = 5 + total_steps = wait_iters + warmup_iters + active_iters + + os.makedirs(output_path, exist_ok=True) + + et = ExecutionTraceObserver() + et.register_callback(f"{output_path}/host.{rank}.json") + + def device_trace_handler(prof): + prof.export_chrome_trace(f"{output_path}/device.{rank}.json") + + activities = [torch.profiler.ProfilerActivity.CPU] + if "cuda" in device: + activities.append(torch.profiler.ProfilerActivity.CUDA) + + with profile( + activities=activities, + schedule=torch.profiler.schedule(wait=wait_iters, warmup=warmup_iters, active=active_iters), + on_trace_ready=device_trace_handler, + record_shapes=True, + execution_trace_observer=et, + ) as prof: + print("Starting training loop...") + for step_id in range(total_steps): + # Generate random input data + x = torch.randn(batch_size, input_size, device=device) + target = torch.randn(batch_size, output_size, device=device) + + # Forward pass + output = model(x) + + # Compute loss + loss = criterion(output, target) + + # Backward pass + optimizer.zero_grad() + loss.backward() + optimizer.step() + if "cuda" in device: + # Ensure all CUDA operations are complete before moving to the next step. + torch.cuda.synchronize() + + print(f"Iteration {step_id + 1}/{total_steps}, Loss: {loss.item():.4f}") + + # Mark the end of a step + prof.step() + + et.stop() + et.unregister_callback() + print("Training complete!") + + +if __name__ == "__main__": + main()