[Fix] Fix tutorial async_ppo script and doc structure (#63)

* test env setup

* .

* fix a missing cherry-pick
This commit is contained in:
Wei Fu 2025-06-01 15:46:46 +08:00 committed by GitHub
parent 4fab3ac769
commit afe5a2c880
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 66 additions and 242 deletions

View File

@ -1,108 +0,0 @@
# Training
## Launch the Ray Cluster
### Start the Ray Head Node
On the first node, start the Ray Head with the following command:
```bash
docker run -d --name r1-ray-head --privileged --gpus all --network host --shm-size 700g -v /storage:/storage ghcr.io/inclusionai/areal-runtime:v0.3.0 /bin/bash -c "ray start --head --port=6379 && tail -f /dev/null"
```
### Start Ray Worker Nodes
On all other nodes, start the Ray Worker with the following command (skip this step for single-node setups):
```bash
# Replace with the actual IP address of the first node
RAY_HEAD_IP=xxx.xxx.xxx.xxx
docker run -d --name r1-ray-worker --privileged --gpus all --network host --shm-size 700g -v /storage:/storage ghcr.io/inclusionai/areal-runtime:v0.3.0 /bin/bash -c "ray start --address=$RAY_HEAD_IP:6379 && tail -f /dev/null"
```
### Verify Cluster Status
Once all nodes are running, check the Ray cluster status by entering the container on the first node:
```bash
docker exec -it r1-ray-head bash
ray status
```
You should see the Ray resource status displayed.
## Launch an Experiment
On the first node (where the Ray Head is located), run the following to launch an asynchronous PPO experiment:
```bash
docker exec -it r1-ray-head bash
cd /storage/codes/AReaL
pip3 install -e .
python3 training/main_async_ppo.py --config-name=async-ppo-1.7b-gpu8
```
This command will locate the YAML configuration file `async-ppo-1.7b-gpu8.yaml` in the `training/configs/async-ppo` folder. The meaning of each configuration entry can be found in `realhf/api/cli_args.py`. You can run asynchronous PPO, synchronous PPO, or SFT depending on the script you execute.
After starting, you'll see training launch information like this:
```
20250528-17:12:16.804 quickstart INFO: Running async-ppo-math experiment.
20250528-17:12:16.804 quickstart INFO: Logs will be dumped to /storage/experiments/logs/admin/async-ppo-1.7b-gpu8/my-trial
20250528-17:12:16.804 quickstart INFO: Experiment configs will be dumped to /storage/experiments/logs/admin/async-ppo-1.7b-gpu8/my-trial/config.yaml
20250528-17:12:16.804 quickstart INFO: Model checkpoints will be saved to /storage/experiments/checkpoints/admin/async-ppo-1.7b-gpu8/my-trial
20250528-17:12:19.261 quickstart INFO: Launching experiments with RAY...
```
**Note**: The saved YAML configuration at `/storage/experiments/logs/admin/async-ppo-1.7b-gpu8/my-trial/config.yaml` can be used to reproduce previous experiments.
## Command Line Options
To view all available options:
```bash
python3 -m realhf.apps.quickstart async-ppo-math --help
```
### Important Parameters
- **`mode`**: Always set to `ray`. Do not change this value when following this tutorial.
- **`{actor|critic|ref}.path`**: The path to the model files.
- **`dataset.path`**: The path to the dataset JSONL file.
- **`cluster.fileroot`**: The root path for saving training outputs.
- **`n_nodes`**: The number of nodes in the cluster.
- **`n_gpus_per_node`**: The number of GPUs per node.
- **`allocation_mode`**: The GPU allocation strategy and 3D parallelism configuration for the experiment. Format:
- `sglang.d${DP1}m${TP1}p${PP1}+d${DP2}m${TP2}p${PP2}`: Configures parallel strategies for SGLang generation and training respectively. Generation and training use separate GPU sets, and the total GPU count must equal: DP1×TP1×PP1 + DP2×TP2×PP2 = #GPUs.
### Training Control Parameters
- **`exp_ctrl.total_train_epochs`**: Number of training epochs (complete dataset iterations).
- **`exp_ctrl.save_freq_{epochs|steps|secs}`**: Frequency for saving model parameters to persistent storage. Set to null to disable saving.
- **`exp_ctrl.ckpt_freq_{epochs|steps|secs}`**: Frequency for saving temporary parameters for restart capability.
- **`dataset.train_bs_n_seqs`**: Training batch size (number of prompts sampled per training iteration).
- **`group_size`**: Number of responses sampled per prompt.
- **`{actor_train|ref_inf|actor_inf}.mb_spec.max_tokens_per_mb`**: Maximum tokens per mini-batch for forward/backward passes during reference model inference and actor model training. Reduce to avoid OOM errors.
- **`ppo.ppo_n_minibatches`**: Number of mini-batches for dividing data during each PPO update.
- **`ppo.recompute_logprob`**: Whether to compute proximal log probabilities for training.
- **`ppo.use_decoupled_loss`**: Use decoupled loss to stabilize asynchronous training.
- **`ppo.gen.max_new_tokens`**: Maximum tokens to generate per prompt (default: 16k).
- **`ppo.gen.min_new_tokens`**: Minimum tokens to generate per prompt (default: 0).
## Monitoring the Training Process
We recommend using Weights & Biases (wandb) for monitoring. Run `wandb login` or set the `WANDB_API_KEY` environment variable. Set `wandb.mode=True` in your configuration to upload training statistics.
The main log will be saved to `/storage/experiments/logs/admin/async-ppo-1.7b-gpu8/my-trial/main.log` and contains the statistics uploaded to wandb.
### Key Training Statistics
- **`Epoch 1/5`**: Indicates total epochs required and current epoch being trained.
- **`step 6/19`**: Shows current epoch has 19 steps, with the 6th step just completed.
- **`global step 6`**: Step count across all epochs.
- **`task_reward`**: Average reward value of all sampled responses in this step. Should steadily increase during training and eventually stabilize.
- **`importance_weight`**: Average importance sampling ratio across all tokens in the PPO loss. Typically close to 1.0.
- **`actor_clip_ratio`**: Ratio of clipped tokens in PPO loss to total tokens. Usually less than 0.1.
- **`actor_loss`**: PPO loss value. **Does not show clear trends during training** and should not be used as a performance indicator.
- **`avg_seq_len`**: Average length of all sequences (prompts with sampled responses) in this step.
- **`no_eos_ratio`**: Ratio of sampled responses truncated due to exceeding maximum generation length. An increase indicates longer average response lengths.

View File

@ -955,10 +955,6 @@ class BaseExperimentConfig:
default_factory=TensorBoardConfig,
metadata={"help": "TensorBoard configuration. Only 'path' field required."},
)
image_name: Optional[str] = field(
default=None,
metadata={"help": "Docker image name for controller (SLURM mode only)."},
)
recover_mode: str = field(
default="disabled",
metadata={

View File

@ -36,7 +36,6 @@ def _submit_workers(
worker_type: str,
scheduling_configs: List[config_package.TasksGroup],
environs: Dict[str, str],
image_name: Optional[str] = None,
) -> List[str]:
if len(scheduling_configs) == 0:
return []
@ -52,7 +51,7 @@ def _submit_workers(
nodelist = sch_cfg.scheduling.nodelist
exclude = sch_cfg.scheduling.exclude
container_image = image_name or sch_cfg.scheduling.container_image
container_image = sch_cfg.scheduling.container_image
scheduled_jobs.append(
sched.submit_array(
@ -214,7 +213,7 @@ def main_start(args, job_group_id: str = "", recover_count: int = 0):
gpu=0,
mem=1024,
env_vars=BASE_ENVIRONS,
container_image=args.image_name or setup.controller_image,
container_image=setup.controller_image,
time_limit=CONTROLLER_TIME_LIMIT,
)
@ -235,7 +234,6 @@ def main_start(args, job_group_id: str = "", recover_count: int = 0):
name,
scheduling_setup,
BASE_ENVIRONS,
args.image_name,
)
try:
@ -367,13 +365,6 @@ def main():
default="dev",
help="slurm partition to schedule the trial",
)
subparser.add_argument(
"--image_name",
type=str,
required=False,
default=None,
help="if specified, all workers will use this image. Useful in CI/CD pipeline.",
)
subparser.add_argument("--ignore_worker_error", action="store_true")
subparser.add_argument(
"--debug",

View File

@ -148,7 +148,7 @@ def init_constants(args: "BaseExperimentConfig"):
"REAL_DUMP_TRACE": os.getenv("REAL_DUMP_TRACE", "0"),
"REAL_DUMP_MEMORY": os.getenv("REAL_DUMP_MEMORY", "0"),
"REAL_GPU_MEMORY_KILL_THRESHOLD": os.getenv(
"REAL_GPU_MEMORY_KILL_THRESHOLD", "0.95"
"REAL_GPU_MEMORY_KILL_THRESHOLD", "1.0"
),
"LC_ALL": "C",
"LANG": "C",

View File

@ -96,8 +96,47 @@ def check_valid_model_and_path(role: str, model: ModelTrainEvalConfig, fileroot)
snapshot_download(
repo_id=model_name,
local_dir=target_path, # Replace '/' to avoid path issues
local_dir_use_symlinks=False,
)
logger.info(f"Model downloaded successfully to: {target_path}")
model.path = target_path
def _check_huggingface_cache(model_name: str) -> Optional[str]:
"""
Check if a model exists in the HuggingFace cache.
Args:
model_name: The HuggingFace model identifier (e.g., 'bert-base-uncased')
Returns:
Optional[str]: Path to cached model if found, None otherwise
"""
# Try to find the model files in cache
# We'll check for common files that should exist in a model repo
common_files = [
"config.json",
"pytorch_model.bin",
"model.safetensors",
"tf_model.h5",
]
cached_path = None
for filename in common_files:
file_path = try_to_load_from_cache(
repo_id=model_name, filename=filename, repo_type="model"
)
if file_path is not None:
# Get the directory containing the cached file
cached_path = os.path.dirname(file_path)
break
# Verify the cached directory exists and contains model files
if cached_path and os.path.exists(cached_path):
# Double-check that it's a valid model directory
if any(os.path.exists(os.path.join(cached_path, f)) for f in common_files):
return cached_path
return None
logger.info(f"Model downloaded successfully to: {target_path}")
# Update the model object's path to point to the downloaded location

View File

@ -619,5 +619,5 @@ class CommonExperimentConfig(BaseExperimentConfig, Experiment):
for alloc in rpc_allocs:
check_valid_parallel_batch_size(alloc)
for role, model in self.models.items():
check_valid_model_and_path(role, model)
check_valid_model_and_path(role, model, self.cluster.fileroot)
check_valid_optimizer(model)

View File

@ -250,11 +250,25 @@ class GserverManager(AsyncWorker):
# but we can acutally update them individually
new_param_path = self.check_new_params()
if new_param_path is not None:
tasks = [
self.flush_requests_and_update_weights(base_url, new_param_path)
for base_url in self.server_urls
]
await asyncio.gather(*tasks)
def _run_in_thread():
# Create a new event loop for this thread
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
tasks = [
self.flush_requests_and_update_weights(base_url, new_param_path)
for base_url in self.server_urls
]
try:
return new_loop.run_until_complete(asyncio.gather(*tasks))
finally:
new_loop.close()
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
future = executor.submit(_run_in_thread)
_ = future.result()
logger.info(f"Generaion server updated weights from: {new_param_path}")
if self.schedule_policy == "least_token_usage":
@ -460,7 +474,7 @@ class GserverManager(AsyncWorker):
self.rollout_stat.accepted += 1
logger.debug(
f"Finish rollout for qid {resp_meta.qid}. "
f"Running: {self.rollout_stat.running}, "
f"Submit: {self.rollout_stat.submitted}, "
f"running: {self.rollout_stat.running}, "
f"accepted: {self.rollout_stat.accepted}"
)

View File

@ -1551,7 +1551,7 @@ class ModelWorker(worker_base.Worker):
# Log GPU utilization and memory statistics.
utilization = pynvml.nvmlDeviceGetUtilizationRates(self.__nvml_handle) # bytes
memory_info = pynvml.nvmlDeviceGetMemoryInfo(self.__nvml_handle) # bytes
kill_threshold = float(os.environ.get("REAL_GPU_MEMORY_KILL_THRESHOLD", "0.95"))
kill_threshold = float(os.environ.get("REAL_GPU_MEMORY_KILL_THRESHOLD", "1.0"))
if memory_info.used / memory_info.total > kill_threshold:
raise RuntimeError(
f"GPU memory excceeds kill threshold {kill_threshold:.2f}. "

View File

@ -1,108 +0,0 @@
actor:
type:
_class: qwen2
path: /storage/models/deepseek-ai__DeepSeek-R1-Distill-Qwen-1.5B
optimizer:
type: adam
lr: 1.0e-05
weight_decay: 0.05
beta1: 0.9
beta2: 0.95
eps: 1.0e-05
min_lr_ratio: 0.0
lr_scheduler_type: constant
warmup_steps_proportion: 0.001
offload: false
initial_loss_scale: 4294967296.0
min_loss_scale: 1.0
loss_scale_window: 5.0
hysteresis: 2
gradient_clipping: 1.0
sglang:
attention_backend: flashinfer
context_length: 32768
mem_fraction_static: 0.8
max_running_requests: null
max_prefill_tokens: 32768
actor_train:
mb_spec:
n_mbs: 1
max_tokens_per_mb: 32768
actor_inf:
mb_spec:
n_mbs: 1
max_tokens_per_mb: 32768
dataset:
path: /storage/datasets/boba_106k_0319_qwen3_think.jsonl
max_prompt_len: 2048
train_bs_n_seqs: 512
fill_to_max_length: false
ppo:
gen:
n: 1
max_new_tokens: 4096
min_new_tokens: 0
greedy: false
top_p: 1.0
top_k: 100000000
temperature: 1.0
use_cuda_graph: true
force_cudagraph_recapture: true
force_no_logits_mask: true
ppo_n_minibatches: 4
eps_clip: 0.2
c_clip: null
value_eps_clip: 0.2
early_stop_imp_ratio: 5.0
actor_sample_reuse: 1
critic_sample_reuse: 1
max_reward_clip: 20.0
reward_output_scaling: 5.0
reward_output_bias: -1.0
fuse_rew_ref: true
discount: 1.0
gae_lambda: 1.0
adv_norm: false
kl_ctl: 0.0
use_adaptive_kl_ctl: false
disable_value: true
recompute_logprob: true
use_decoupled_loss: true
behav_imp_weight_cap: null
group_size: 4
new_tokens_per_chunk: 1024
max_head_offpolicyness: 4
max_concurrent_rollouts: 256
flush_request_timeout: 300
experiment_name: multi-turn-rl-math
trial_name: my-trial
mode: ray
partition: dev
schedule_strategy: empty_first
wandb:
mode: online
tensorboard:
path: null
image_name: null
recover_mode: auto
recover_retries: 0
recover_after: 10
ignore_worker_error: false
allocation_mode: sglang.d32m1p1+d16p2m1
n_nodes: 8
n_gpus_per_node: 8
seed: 1
cache_clear_freq: 1
exp_ctrl:
total_train_epochs: 5
save_freq_steps: 20
ckpt_freq_secs: 3600
torch_cache_mysophobia: true
shuffle_dataset: true
ray_temp_path: /tmp/ray
cluster:
config_path: ''
cluster_name: local
fileroot: /storage/ray/experiments
turn_level_discount: 1.0
num_turns: 4

View File

@ -13,7 +13,7 @@ from realhf.experiments.async_exp.async_ppo_math_exp import AsyncPPOMATHConfig
from training.utils import run_experiment
@hydra.main(version_base=None, config_path="configs/async-ppo")
@hydra.main(version_base=None, config_path="configs", config_name="async-ppo")
def main_ppo_math(args):
# NOTE: we import logging here to avoid hydra logging overwrite
import realhf.base.logging as logging