Skip to content

Commit 64693b0

Browse files
Update the diagg multi_host script to auto lunch proxy (#1229)
1 parent f1dfbc5 commit 64693b0

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

examples/disagg/run_disagg_multi_host.sh

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ if [ -n "$CONTAINERS" ]; then
1212
docker rm -f $CONTAINERS
1313
fi
1414

15-
# NOTE: Strange ray race condition between dock build and ray start in single machine
16-
# if you find error, comments these line and rerun this .sh file
17-
# _wait_until_pg_ready(current_placement_group)
18-
# tpu-inference/tpu_inference/executors/ray_distributed_executor.py
15+
# update the BUILDKITE_COMMIT to the right commit hash
16+
# organize the docker_image in this way, which can share
17+
# the image for benchmark and buildkite code
18+
BUILDKITE_COMMIT='000'
19+
DOCKER_IMAGE="vllm-tpu:${BUILDKITE_COMMIT}"
1920

2021
docker image prune -f
21-
docker build -f docker/Dockerfile -t ullm:test .
22-
DOCKER_IMAGE="ullm:test"
22+
docker build -f docker/Dockerfile -t ${DOCKER_IMAGE} .
2323

2424
HOST_HF_HOME="/mnt/disks/data/hf-docker"
2525
NUM_HOSTS_PER_INSTANCE=4
@@ -117,7 +117,7 @@ for ((i=0; i<NUM_HOSTS_PER_INSTANCE; i++)); do
117117
tpu_index=$((i + NUM_HOSTS_PER_INSTANCE))
118118

119119
if [[ i -eq 0 ]]; then
120-
DOCKER_CMD="ray start --block --head --port=${DECODE_RAY_PORT}"
120+
DOCKER_CMD="ray start --block --head --port=${DECODE_RAY_PORT} --min-worker-port=20000 --max-worker-port=29999"
121121
else
122122
DOCKER_CMD="ray start --block --address=127.0.0.1:${DECODE_RAY_PORT}"
123123
fi
@@ -171,3 +171,28 @@ docker exec node-20 /bin/bash -c \
171171
--kv-transfer-config '{\"kv_connector\":\"TPUConnector\",\"kv_connector_module_path\":\"tpu_inference.distributed.tpu_connector\",\"kv_role\":\"kv_consumer\"}' \
172172
> /root/logs/decode.txt 2>&1 &"
173173
set +x
174+
175+
176+
# Start proxy server
177+
python $HOME/tpu-inference/examples/disagg/toy_proxy_server.py \
178+
--host localhost \
179+
--port 8000 \
180+
> $HOME/logs/proxy.txt 2>&1 &
181+
182+
183+
cat <<'EOF'
184+
The proxy server has been launched on: 127.0.0.1:8000
185+
186+
>> Send example request:
187+
188+
curl -X POST \
189+
http://127.0.0.1:8000/v1/completions \
190+
-H "Content-Type: application/json" \
191+
-d '{"prompt": "We hold these truths to be self-evident, that all men are created equal, that they are endowed by their Creator with certain unalienable Rights, that among these are Life, Liberty and the pursuit of Happiness.--That to secure these rights, Governments are instituted among Men, deriving their just powers from the consent of the governed, ", "max_tokens": 10}'
192+
193+
>> Stop the proxy server and all prefill/decode instances:
194+
195+
docker stop $(docker ps -a --filter "name=node*" -q)
196+
docker rm -f $(docker ps -a --filter "name=node*" -q)
197+
pkill -f "toy_proxy_server" && pkill -f "run_disagg_multi_host"
198+
EOF

tpu_inference/distributed/tpu_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,6 @@ def __init__(self, vllm_config: VllmConfig):
457457
self.side_channel_port = get_side_channel_port()
458458

459459
self.kv_transfer_server = None
460-
self._maybe_start_p2p_server()
461460
self.zmq_cxt = zmq.Context()
462461
if self.is_producer:
463462
ready_event = threading.Event()
@@ -499,6 +498,7 @@ def register_runner(self, runner: TPUModelRunner):
499498
self.shape = list(kv_layer.shape)
500499
self.dtype = kv_layer.dtype
501500
self.sharding = kv_layer.sharding
501+
self._maybe_start_p2p_server()
502502

503503
def _maybe_start_p2p_server(self):
504504
if self.kv_transfer_server is not None:

tpu_inference/executors/ray_distributed_executor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,14 @@ def _initialize_ray_cluster(self) -> None:
136136

137137
pp_size = self.parallel_config.pipeline_parallel_size
138138
placement_group_specs: List[Dict[str, float]] = []
139+
140+
ray_nodes = ray.nodes()
141+
logger.info(f"RayDistributedExecutor | ray_nodes={ray_nodes}")
142+
139143
if pp_size == 1:
140144
placement_group_specs = [{
141145
device_str: node['Resources'][device_str]
142-
} for node in ray.nodes()]
146+
} for node in ray_nodes]
143147
else:
144148
num_devices_per_pp_rank = self.vllm_config.sharding_config.total_devices
145149
placement_group_specs = [{

0 commit comments

Comments
 (0)