Merge remote-tracking branch 'backup/lite' into lcy/refactor

This commit is contained in:
朱晗 2025-07-24 19:34:26 +08:00
commit 84be9c9baf
11 changed files with 1269 additions and 901 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
<h1 align="center"> <h1 align="center">
<em>AReaL</em> v0.2: Training a SOTA 7B LRM with 1.5x Throughput Improvment <em>AReaL</em> v0.2: Training a SOTA 7B LRM with 1.5x Throughput Improvement
</h1> </h1>
<p align="center" style="font-size: 0.8em; color: #666;"> <p align="center" style="font-size: 0.8em; color: #666;">

View File

@ -1,4 +1,6 @@
# AReaL v0.3 <h1 align="center">
<em>AReaL</em> v0.3: SOTA Coding Models with 2.77x Faster Asynchronous RL Training
</h1>
## Introduction ## Introduction

View File

@ -7,8 +7,8 @@ parts:
- caption: Tutorial - caption: Tutorial
chapters: chapters:
- file: tutorial/installation - file: tutorial/installation
- file: tutorial/quickstart_arealite
- file: tutorial/quickstart - file: tutorial/quickstart
- file: tutorial/quickstart_legacy
- file: tutorial/eval - file: tutorial/eval
- file: tutorial/troubleshooting - file: tutorial/troubleshooting
- caption: Getting Started with AReaLite - caption: Getting Started with AReaLite

View File

@ -1 +1,497 @@
# Running GRPO on GSM8K Dataset # Running GRPO on GSM8K Dataset
This guide introduces how AReaLite runs the GRPO algorithm on the GSM8K dataset, using
the training script
[examples/arealite/gsm8k_grpo.py](../../examples/arealite/gsm8k_grpo.py) and
configuration file
[examples/arealite/configs/gsm8k_grpo.yaml](../../examples/arealite/configs/gsm8k_grpo.yaml).
## How AReaLite Works
The following figure illustrates the launching and one asynchronous training step of the
GRPO algorithm on the GSM8K dataset on AReaLite. Compared with the old AReaL
implementation, AReaLite runs inference servers and a SPMD training script instead of a
bunch of various workers. In a training step, AReaLite:
1. Submits prompts from the dataset to `RemoteSGLangEngine`, who runs `RLVRWorkflow` in
a streaming manner.
1. Completes `RLVRWorkflow` by interacting with remote `SGLangServer` instances to
generate sequences, and computing rewards with the reward function.
1. Once there are enough outputs from `RLVRWorkflow`, aggregates them into a data batch
for algorithm-specific training engine `FSDPPPOActor`.
1. Computes losses and update weights in `FSDPPPOActor`.
1. Transfers the updated weights to remote `SGLangServer` instances.
![arealite-gsm8k-example](gsm8k_grpo.png)
In the following sections, we will walk you through the code to explain concepts and
show you how these steps are done in details.
## Launching the Experiment
As shown in [Quickstart Guide](../tutorial/quickstart.md), experiments in AReaLite are
launched using standalone launchers with the following commands:
```
# Local Launcher
python -m arealite.launcher.local <training script> --config <configuration file> <cli args>
# Ray Launcher
python -m arealite.launcher.ray <training script> --config <configuration file> <cli args>
# Slurm Launcher
python -m arealite.launcher.slurm <training script> --config <configuration file> <cli args>
```
In AReaLite:
- The **training script** is an SPMD python script that serves as the experiment entry
point.
- The launcher runs the training script with its distributed backend (`subprocess` for
`LocalLauncher`, `ray.remote` for `RayLauncher`, `srun` for `SlurmLauncher`).
- The launcher also manages inference servers (currently only supporting
`SGLangServer`). The number and parallelization strategies (e.g. tensor parallel) are
determined by the option [allocation_mode](../../arealite/api/cli_args.py#L797).
- For distributed launchers (`RayLauncher` and `SlurmLauncher`), inference servers run
with a wrapper
[arealite/launcher/sglang_server.py](../../arealite/launcher/sglang_server.py) to
handle addresses and ports in distributed settings.
- After `SGLangServer` instances are started, launchers collect their addresses and
ports to set the `AREAL_LLM_SERVER_ADDRS` environment variable for training scripts to
access these inference servers.
The **configuration file** is a YAML file that sets the options provided in
[arealite/api/cli_args.py](../../arealite/api/cli_args.py). It could be modified via CLI
arguments such as `actor.path=Qwen/Qwen3-1.7B` and `+sglang.attention_backend=triton`.
The training scripts parse the config with CLI arguments into the config class defined
in [arealite/api/cli_args.py](../../arealite/api/cli_args.py).
```
config, _ = load_expr_config(args, GRPOConfig)
config: GRPOConfig
```
## Loading and Preprocessing Dataset
We use the `datasets` and `torchdata` packages to load and preprocess the dataset into
our dataloader. First, we download `openai/gsm8k` from Huggingface and split it by data
parallel ranks, then map it to our desired format:
```python
def process_gsm8k_rl_dataset(dataset: Dataset):
def process(sample):
messages = [{"role": "user", "content": sample["question"]}]
return {"messages": messages}
dataset = dataset.map(process).remove_columns(["question"])
return dataset
def get_gsm8k_dataset(split, rank, world_size):
dataset = load_dataset(path="openai/gsm8k", name="main", split=split)
dataset = split_dataset_by_node(dataset, rank=rank, world_size=world_size)
return process_gsm8k_rl_dataset(dataset)
```
We then prepare training and evaluation dataloaders with `torchdata.StatefulDataLoader`:
```python
train_dataloader = torchdata.StatefulDataLoader(
get_gsm8k_dataset("train", rank, world_size),
batch_size=config.train_dataset.batch_size // world_size,
shuffle=config.train_dataset.shuffle,
num_workers=config.train_dataset.num_workers,
collate_fn=lambda x: x,
drop_last=config.train_dataset.drop_last,
)
valid_dataloader = ...
```
If you wish to use your own huggingface datasets or datasets on your local storage,
please refers to [Customization: Dataset](../customization/dataset.md) for further
details.
## Rollout
The data lifecycle is controlled by an `RLVRWorkflow`, which defines how data progresses
from prompts to complete rollout data containing all fields required for training. Our
example shows a single-turn RLVR workflow with a math reward function. The core logic of
the workflow is implemented in an async method `arun_episode`, which takes a prompt,
generate answers with `RemoteSGLangEngine`, computes rewards, and populates additional
fields to produce finalized training data.
```python
class RLVRWorkflow(RolloutWorkflow):
def __init__(
self, reward_fn, gconfig, tokenizer, ...
):
self.reward_fn = reward_fn
self.gconfig = gconfig
self.tokenizer = tokenizer
async def arun_episode(self, engine, data):
# rollout data with inference engine
input_ids = self.tokenizer.apply_chat_template(data["message"], ...)
req = LLMRequest(rid=..., input_ids=input_ids, gconfig=self.gconfig.new(n_samples=1))
resps = await asyncio.gather(
*[engine.agenerate(req) for _ in range(self.gconfig.n_samples)]
)
# post process rollout responses
results = []
for resp in resps:
reward = self.reward_fn(...)
... # other required fields for training
res = dict(
input_ids=...,
rewards=...,
... # other required fields for training
)
results.append(res)
# return padded `self.gconfig.n_samples` samples with prompt `data["message"]`
return concat_padded_tensors(results)
def gsm8k_reward_fn(completions, answer):
...
tokenizer = load_hf_tokenizer(config.tokenizer_path)
workflow = RLVRWorkflow(
reward_fn=gsm8k_reward_fn,
gconfig=config.gconfig,
tokenizer=tokenizer,
...
)
```
In AReaLite, generation tasks are offloaded to remote inference servers, which operate
on separate GPUs from those used for training. The `RemoteSGLangEngine` acts as a client
that interacts with the servers. `RemoteSGLangEngine` runs in a SPMD manner on every
training process, without occupying any GPUs.
`RemoteSGLangEngine` is responsible for managing the data streaming through rollout
workflows, and collates completed rollout data into batched training samples. When
initializing, it launches a rollout thread that runs rollout workflows as `asyncio`
tasks. The following code shows the simplified version of rollout thread implementation,
which iteratively:
- Checks available capacity. The capacity controls current number of rollout workflows
to limit concurrency and data off-policyness.
- If there is capacity left and rollout is not paused for weight update, continuously
obtains data from `input_queue` and creates `asyncio` tasks to run the workflows.
- Waits for rollout workflows to finish.
- Gathers data from finished workflows and puts them into `output_queue`
```python
class RemoteSGLangEngine(InferenceEngine):
...
async def _rollout_thread_async(self):
rid = 0
try:
while not self.exiting.is_set():
# Check capacity
capacity = self.get_capacity()
# Create rollout tasks with data obtained from input_queue
while (
capacity > 0
and not self.paused.is_set()
and self.input_queue.qsize() > 0
):
data, workflow = self.input_queue.get_nowait()
task = asyncio.create_task(
workflow.arun_episode(self, data), name=str(rid)
)
rollout_tasks[str(rid)] = task
self.rollout_stat.submitted += 1
self.rollout_stat.running += 1
capacity -= 1
rid += 1
# Wait for rollout completion
tasks = list(rollout_tasks.values())
done = []
if tasks:
done, _ = await asyncio.wait(
tasks,
timeout=ROLLOUT_POLL_WAIT_TIME,
return_when=asyncio.FIRST_COMPLETED,
)
# Collect done results, put the results into output queue
for task in done:
traj = await task
task_rid = task.get_name()
rollout_tasks.pop(task_rid)
self.rollout_stat.accepted += 1
self.output_queue.put_nowait(traj)
self.rollout_stat.running -= 1
await asyncio.sleep(1)
...
```
With this rollout thread running, the training script (the main thread) submits prompts
into `input_queue` and collates rollout data from `output_queue` into training batches
with `prepare_batch` (for asynchronous RL) or `rollout_batch` (for synchronous RL). The
following code shows the implementation of `prepare_batch`:
```python
def prepare_batch(
self,
dataloader: StatefulDataLoader,
workflow: "RolloutWorkflow",
):
if not hasattr(self, "data_generator"):
self.data_generator = iter(dataloader)
assert dataloader.batch_size is not None
while True:
# Submit at least two batches to allow maximum overlap
if (
self.get_capacity() + dataloader.batch_size > 0
and self.input_queue.qsize() + dataloader.batch_size
< self.input_queue.maxsize
):
try:
data = next(self.data_generator)
except StopIteration:
self.data_generator = iter(dataloader)
data = next(self.data_generator)
for item in data:
# submit data into input_queue
self.submit(item, workflow=workflow)
try:
# wait for dataloader.batch_size data from output_queue
return self.wait(dataloader.batch_size, timeout=1)
except TimeoutError:
pass
```
The usage of `RemoteSGLangEngine` in the training script is simple:
```python
rollout = RemoteSGLangEngine(config.rollout)
rollout.initialize()
eval_rollout = ...
data_generator = iter(train_dataloader)
for global_step in range(max_steps):
# rollout batched training data for current step
if config.async_training:
batch = rollout.prepare_batch(train_dataloader, workflow=workflow)
else:
try:
data = next(data_generator)
except StopIteration:
data_generator = iter(train_dataloader)
data = next(data_generator)
batch = rollout.rollout_batch(data, workflow=workflow)
```
If you want to use rollout workflows with custom reward functions or agentic tool
calling, see [Customization: Rollout Workflows](../customization/agent.md) for more
details.
## Training
After obtaining the training batch, we use `FSDPPPOActor` to calculate losses and update
weights. Each train engine corresponds to one model, therefore we need an additional
engine for the reference model. Note that `torch.distributed` process groups will be
lazily initialized using `init_process_group` when the first train engine is
initialized. The initialization of train engine will also load model weights from paths
specified by the configuration.
```python
actor = FSDPPPOActor(config=config.actor)
actor.initialize(None, ft_spec)
ref = None
if config.actor.kl_ctl > 0 and config.ref is not None:
ref = FSDPPPOActor(config=config.ref)
ref.initialize(None, ft_spec)
```
`FSDPPPOActor` is a high-level engine with algorithm-specific APIs, such as
`compute_logp`,`compute_advantages` and `ppo_update`. `FSDPPPOActor` is powered by the
lower-level train engine `FSDPEngine`, which use **pytorch FSDP2** to provide basic APIs
for the model such as `train_batch` and `forward`. The following code shows a GRPO
training step:
```python
logp = actor.compute_logp(batch)
batch["prox_logp"] = logp
if ref is not None:
batch["ref_logp"] = ref.compute_logp(batch)
log_gpu_stats("ref logp")
actor.compute_advantages(batch)
stats = actor.ppo_update(batch)
actor.step_lr_scheduler()
```
If you want to customize your own training algorithm, see
[Customize algorithms](../customization/algorithm.md) for more details.
## Transferring Weights to Inference Servers
After training, we transfer updated model weights to remote inference servers through
cooperation between `FSDPPPOActor` and `RemoteSGLangEngine`. We provide options to
transfer model weights from shared storage or NCCL. In our example training script, we
first prepare `WeightUpdateMeta` for NCCL backend on all training processes.
```python
# NOTE: Weight update meta only requires address and free port of rank 0,
# but `WeightUpdateMeta.from_fsdp_nccl` has to be executed on all ranks
# due to `engine.get_param_specs()`.
# Therefore, we create weight update meta on all ranks, then broadcast the one on rank 0.
weight_update_meta = [
WeightUpdateMeta.from_fsdp_nccl(
AllocationMode.from_str(config.allocation_mode), actor
)
]
dist.broadcast_object_list(weight_update_meta, src=0)
weight_update_meta = weight_update_meta[0]
```
If you wish to transfer model weights from shared storage, you can use:
```python
weight_update_meta = WeightUpdateMeta.from_disk(config.saver)
```
After a training step is finished, we transfer new weights from actor engine to remote
inference servers with steps shown in the following code:
```python
# 1. Pause rollout on remote inference servers
rollout.pause()
# 2. Send requests to remote servers, tell them to update weights
if dist.get_rank() == 0:
future = rollout.update_weights(weight_update_meta)
# 3. Actor begins to transfer weights
actor.upload_weights(weight_update_meta)
# 4. Wait for remote servers to return after finishing updates
if dist.get_rank() == 0:
future.result()
# 5. Synchronize rollout processes for model version update
dist.barrier(device_ids=[actor.device.index])
torch.cuda.synchronize()
# 6. Resume rollout on remote inference servers
rollout.resume()
# 7. Set version, ensures versions on actor and rollout engine are identical
actor.set_version(global_step + 1)
rollout.set_version(global_step + 1)
```
Now a complete GRPO training step in AReaLite is done! The core logic of our example
training script can be summarized as:
```python
data_generator = iter(train_dataloader)
for global_step in range(max_steps):
if config.async_training:
batch = rollout.prepare_batch(train_dataloader, workflow=workflow)
else:
try:
data = next(data_generator)
except StopIteration:
data_generator = iter(train_dataloader)
data = next(data_generator)
batch = rollout.rollout_batch(data, workflow=workflow)
logp = actor.compute_logp(batch)
batch["prox_logp"] = logp
if ref is not None:
batch["ref_logp"] = ref.compute_logp(batch)
log_gpu_stats("ref logp")
actor.compute_advantages(batch)
stats = actor.ppo_update(batch)
actor.step_lr_scheduler()
rollout.pause()
if dist.get_rank() == 0:
future = rollout.update_weights(weight_update_meta)
actor.upload_weights(weight_update_meta)
if dist.get_rank() == 0:
future.result()
rollout.resume()
actor.set_version(global_step + 1)
rollout.set_version(global_step + 1)
```
## Utilities
In AReaLite, we provide a wide range of utilities for basic functionalities required for
observing and tuning your experiments.
### `Saver` and `Evaluator`
`Saver` ([arealite/utils/saver.py](../../arealite/utils/saver.py)) and `Evaluator`
([arealite/utils/evaluator.py](../../arealite/utils/evaluator.py)) manage the frequency
to save and evaluate the model with the train engine.
In our example, we call `saver.save` and `evaluator.evaluate` after every training step.
these two methods will automatically check if it is time to save or evaluate the model,
according to the experiment configuration.
### `stats_tracker`
`stats_tracker` ([realhf/base/stats_tracker.py](../../realhf/base/stats_tracker.py))
gathers training statistics across parallel ranks and reduce them.
1. **Scalar-type statistics** are recorded by `stats_tracker.scalar(key=value)` and will
be averaged by the number of scalars with the same key when reduced.
1. **Tensor-type statistics** require `denominator` and `reduce_type` to decide how to
reduce statistics under the same key.
- `denominator` is a bool tensor that masks the elements in the tensor that we do not
want to record.
- `reduce_type` includes average, sum, min and max. By default, the average, min and max
are all calculated.
For example, if we want to record the length of sequences with correct and incorrect
answers in a training batch:
```python
seqlens = ... # tensor of shape [#seqs,]
reward_score = ... # tensor of shape [#seqs,]
result_denominators = {
"correct_n_seqs": (reward_score > 0).bool(),
"incorrect_n_seqs": (reward_score <= 0).bool(),
}
# register the denominator
stats_tracker.denominator(**result_denominators)
# record the correct and incorrect sequence length
stats_tracker.stat(
correct_seq_len=seqlens.float(), denominator="correct_n_seqs"
)
stats_tracker.stat(
incorrect_seq_len=seqlens.float(), denominator="incorrect_n_seqs"
)
```
`stats_tracker` offers timer context to record time cost of a code block as a scalar.
And there is also a scope context to manage keys of statistics.
```python
with stats_tracker.record_timing("train_step"):
# training step
...
with stats_tracker.scope("A"):
stats_tracker.scalar(c=123) # key="A/c", value=123
with stats_tracker.scope("B"):
stats_tracker.scalar(c=234) # key="A/B/c", value=234
```
After recording sufficient data, e.g. after a `train_batch` is finished,
`stats_tracker.export` is called to aggregate all statistics and dump them into a
dictionary.
```python
stats = stats_tracker.export()
```
### `StatsLogger`
`StatsLogger` ([arealite/utils/stats_logger.py](../../arealite/utils/stats_logger.py))
logs gathered training data to recorders like `wandb` and `tensorboard` on rank 0. In
our example script, after finishing a training step,
`logger.commit(epoch, step, global_step, stats)` is called to record all statistics from
`stats_tracker` to print them as well as log them into the recorders set by the
configuration.
## Next Steps
- [Customize dataset](../customization/dataset.md)
- [Customize Agentic/RVLR rollout workflows](../customization/agent.md)
- [Customize algorithms](../customization/algorithm.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

View File

@ -146,7 +146,7 @@ class AsyncPPOMATHConfig(AsyncRLExperimentConfig, PPOMATHConfig):
## Step 4: Run Training ## Step 4: Run Training
Follow the standard training procedure outlined in the Follow the standard training procedure outlined in the
[quickstart guide](../../tutorial/quickstart.md). Launch your experiment with: [quickstart guide](../../tutorial/quickstart_legacy.md). Launch your experiment with:
```bash ```bash
python3 training/main_async_ppo.py my_param=5.0 # plus any additional CLI arguments python3 training/main_async_ppo.py my_param=5.0 # plus any additional CLI arguments

View File

@ -1,123 +1,121 @@
# Quickstart (Legacy) # Quickstart
> **Note**: This is a quickstart guide for launching AReaL experiment with legacy code in `realhf/`. We strongly recommend users to try AReaLite for better experiences. [Click here](quickstart_arealite.md) for AReaLite quickstart guide! Welcome to the **AReaLite** Quickstart Guide! This guide demonstrates how to run an
AReaLite experiment training an LLM on the GSM8K dataset using the GRPO algorithm with
function-based rewards. Ensure you've completed
[the installation and environment setup](installation.md) before proceeding.
This guide walks you through a simple example of training an LLM to solve math problems. Please ensure you have properly [installed dependencies and set up the runtime environment](installation.md) before proceeding. ## Running the Experiment (on a single node)
## Dataset To run the experiment, you will need:
Use `huggingface-cli` to download our open-source dataset: - Training script:
[examples/arealite/gsm8k_grpo.py](../../examples/arealite/gsm8k_grpo.py)
- Config YAML:
[examples/arealite/configs/gsm8k_grpo.yaml](../../examples/arealite/configs/gsm8k_grpo.yaml)
```bash Our training scripts will automatically download the dataset (openai/gsm8k) and model
huggingface-cli download --repo-type=dataset inclusionAI/AReaL-RL-Data (Qwen/Qwen2-1.5B-Instruct). To run the example with default configuration, execute from
the repository directory:
```
python3 -m arealite.launcher.local examples/arealite/gsm8k_grpo.py --config examples/arealite/configs/gsm8k_grpo.yaml experiment_name=<your experiment name> trial_name=<your trial name>
``` ```
> **Note**: The command above will display the path of the downloaded dataset. You'll need to pass this path to the training command. > **Note**: The command above uses `LocalLauncher`, which only works for a single node
> (`cluster.n_nodes == 1`). For distributed experiments, see
> [Distributed Experiments with Ray or Slurm](quickstart.md#distributed-experiments-with-ray-or-slurm).
## Model ## Modifying configuration
We train using open-source models available on Hugging Face Hub. You can either download the model in advance or use the model identifier when running the experiment. All available configuration options are listed in
[arealite/api/cli_args.py](https://github.com/inclusionAI/AReaL/blob/main/arealite/api/cli_args.py).
To customize the experiment (models, resources, algorithm options), you can:
```bash 1. Edit the YAML file directly at
# If you want to download it in advance [examples/arealite/configs/gsm8k_grpo.yaml](../../examples/arealite/configs/gsm8k_grpo.yaml).
huggingface-cli download Qwen/Qwen3-1.7B 1. Add command-line options:
- For existing options in the YAML file, directly add the option:
`actor.path=Qwen/Qwen3-1.7B`.
- For other options in `cli_args.py`, but not in the YAML file, add with a prefix
"+": `+sglang.attention_backend=triton`.
For example, here is the command to launch a customized configuration, based on our
GSM8K GRPO example:
```
python3 -m arealite.launcher.local examples/arealite/gsm8k_grpo.py \
--config examples/arealite/configs/gsm8k_grpo.yaml \
experiment_name=<your experiment name> \
trial_name=<your trial name> \
allocation_mode=sglang.d2p1t1+d2p1t1 \
cluster.n_nodes=1 \
cluster.n_gpus_per_node=4 \
gconfig.max_new_tokens=2048 \
train_dataset.batch_size=1024 \
+sglang.attention_backend=triton
``` ```
Refer to the [official documentation](https://huggingface.co/docs/huggingface_hub/guides/cli) for more information on using `huggingface-cli`. ::::{important} We're currently refactoring from legacy AReaL to AReaLite, which
introduces some configuration differences. We provide a **config converter** to transfer
old AReaL config into AReaLite YAML file for users' convenience. [Click here](xxx) to
learn how to use the **config converter**. ::::
## Training ## Distributed Experiments with Ray or Slurm
From the repository directory, run: AReaLite provides standalone launchers for distributed experiments. After setting up
your Ray or Slurm cluster, launch experiments similarly to `LocalLauncher`:
```bash ```
# examples/run_async_ppo.sh # Launch with Ray launcher. 4 nodes (4 GPUs each), 3 nodes for generation, 1 node for training.
python3 training/main_async_ppo.py \ python3 -m arealite.launcher.ray examples/arealite/gsm8k_grpo.py \
n_nodes=1 n_gpus_per_node=8 \ --config examples/arealite/configs/gsm8k_grpo.yaml \
allocation_mode=sglang.d4p1m1+d2p2m1 \ experiment_name=<your experiment name> \
cluster.fileroot=/path/to/save/logs/checkpoints/ \ trial_name=<your trial name> \
actor.type._class=qwen3 \ allocation_mode=sglang.d12p1t1+d4p1t1 \
actor.path=Qwen/Qwen3-1.7B \ cluster.n_nodes=4 \
ref.type._class=qwen3 \ cluster.n_gpus_per_node=4 \
ref.path=Qwen/Qwen3-1.7B \ ...
dataset.path=/path/to/boba_106k_0319.jsonl \
dataset.train_bs_n_seqs=32 \ # Launch with Slurm launcher. 16 nodes (8 GPUs each), 12 nodes for generation, 4 nodes for training
group_size=8 \ python3 -m arealite.launcher.slurm examples/arealite/gsm8k_grpo.py \
ppo.gen.max_new_tokens=4096 \ --config examples/arealite/configs/gsm8k_grpo.yaml \
ppo.ppo_n_minibatches=4 \ experiment_name=<your experiment name> \
actor_train.mb_spec.max_tokens_per_mb=32768 \ trial_name=<your trial name> \
actor_inf.mb_spec.max_tokens_per_mb=32768 \ allocation_mode=sglang.d96p1t1+d32p1t1 \
max_concurrent_rollouts=16 \ cluster.n_nodes=16 \
max_head_offpolicyness=4 cluster.n_gpus_per_node=8 \
...
``` ```
::::{important} Additional references:
Running `main_async_ppo.py` with `ppo.recompute_logprob=False`, `ppo.use_decoupled_loss=False`, and `max_head_offpolicyness=0` will essentially replicate the behavior of synchronous PPO. Therefore, it's usually not recommended to run synchronous PPO directly (i.e., `main_sync_ppo.py`). The workflow of asynchronous RL is more stable and easier to customize.
::::
## Command Line Options - For more options for launchers, check `LauncherConfig` in
[arealite/api/cli_args.py](https://github.com/inclusionAI/AReaL/blob/main/arealite/api/cli_args.py).
- [Ray cluster setup guide](installation.md#optional-launch-ray-cluster-for-distributed-training)
for a guide on how to set up a ray cluster.
To view all available options: > **Important Notes**:
>
> 1. Ensure `allocation_mode` matches your cluster configuration
> (`#GPUs == cluster.n_nodes * cluster.n_gpus_per_node`)
> 1. Ray/Slurm launchers only works for more than 1 node (`cluster.n_nodes > 1`). For
> single node scenario, please use `LocalLauncher`.
> 1. In Ray/Slurm launchers, GPUs are allocated at node granularity, which means #GPUs
> for generation or training must be integer multiples of `cluster.n_gpus_per_node`.
```bash <!--
python3 training/main_sync_ppo.py --help > **Notes**: Before launching distributed experiments, please check if your `allocation_mode` matches your cluster configuration. Make sure #GPUs allocated by `allocation_mode` equals to `cluster.n_nodes * cluster.n_gpus_per_node`.
``` > **Note**: Ray and Slurm launchers only work for distributed experiments with more than 1 node (`cluster.n_nodes > 1`). They allocate GPUs for training and generation at the granularity of **nodes**, which means the number of GPUs allocated for generation and training must be integer multiples of `cluster.n_gpus_per_node`.
-->
### Configuration Parameters
- **`experiment_name`**: The name of your project.
- **`trial_name`**: The name of this trial in your project.
- **`{actor|ref}.path`**: The path to the model files.
- **`dataset.path`**: The path to the dataset JSONL file.
- **`cluster.fileroot`**: The root path for saving training outputs (logs and checkpoints).
- **`n_nodes`**: The number of nodes in the cluster.
- **`n_gpus_per_node`**: The number of GPUs per node.
- **`allocation_mode`**: The GPU allocation strategy and 3D parallelism configuration for the experiment. Format:
- `sglang.d${DP1}m${TP1}p${PP1}+d${DP2}m${TP2}p${PP2}`: Configures parallel strategies for SGLang generation and training respectively. Generation and training use separate GPU sets, and the total GPU count must equal: DP1×TP1×PP1 + DP2×TP2×PP2 = #GPUs.
### Training Control
- **`exp_ctrl.total_train_epochs`**: Number of training epochs (complete dataset iterations).
- **`exp_ctrl.save_freq_{epochs|steps|secs}`**: Frequency for saving model parameters to persistent storage. Set to null to disable saving.
- **`exp_ctrl.ckpt_freq_{epochs|steps|secs}`**: Frequency for saving temporary parameters for restart capability.
- **`dataset.train_bs_n_seqs`**: Training batch size (number of prompts sampled per training iteration).
- **`group_size`**: Number of responses sampled per prompt.
### Memory and Performance
- **`{actor_train|ref_inf|actor_inf}.mb_spec.max_tokens_per_mb`**: Maximum tokens per mini-batch for forward/backward passes during reference model inference and actor model training. Reduce this value to avoid OOM errors.
- **`max_concurrent_rollouts`**: The maximum number of concurrent rollouts. SGLang will run out of memory if this value is too large. Defaults to `dataset.train_bs_n_seqs`.
### Algorithm Configuration
- **`max_head_offpolicyness`**: The allowed maximum data staleness. 0 recovers synchronous training. A large value will increase generation throughput but degrade final performance. We recommend keeping this value at 8 or below.
- **`ppo.recompute_logprob`**: Whether to compute proximal log probabilities for training. Defaults to True for asynchronous experiments and False for synchronous baselines.
- **`ppo.use_decoupled_loss`**: Use decoupled loss to stabilize asynchronous training. Defaults to True.
- **`ppo.gen.max_new_tokens`**: Maximum tokens to generate per prompt.
- **`ppo.ppo_n_minibatches`**: Number of mini-batches for dividing data during each PPO update.
- **`success_rate_ub`**: Upper bound of success rate. Prompts with a higher success rate will be filtered out.
- **`success_rate_lb`**: Lower bound of success rate. Prompts with a lower success rate will be filtered out.
## Monitoring the Training Process
+ We recommend using [Weights & Biases (wandb)](https://github.com/wandb/wandb) or [SwanLab](https://github.com/SwanHubX/SwanLab) for monitoring—run `wandb login` or `swanlab login`, or set the corresponding environment variable API key (`WANDB_API_KEY` or `SWANLAB_API_KEY`). Set `wandb.mode="online"` or `swanlab.mode="cloud"` in your configuration to upload training statistics. If you cannot connect to the server, you can also use `wandb.mode="offline"` or `swanlab.mode="local"` to save data locally without uploading.
You can also use TensorBoard by setting the `tensorboard.path` parameter.
The main log will be saved to `${fileroot}/logs/${USER}/${experiment_name}/${trial_name}/main.log` and contains the statistics uploaded to wandb.
If SwanLab is enabled, logs will be saved to the directory specified by `swanlab.logdir`.
### Key Training Statistics
- **`Epoch 1/5`**: Indicates the total epochs required and the current epoch being trained.
- **`step 6/19`**: Shows that the current epoch has 19 steps, with the 6th step just completed.
- **`global step 6`**: Step count across all epochs.
- **`ppo_actor/task_reward/avg`**: Average reward value of all sampled responses in this step. This should steadily increase during training and eventually stabilize.
- **`ppo_actor/importance_weight/avg`**: Average importance sampling ratio across all tokens in the PPO loss. This is typically close to 1.0.
- **`ppo_actor/actor_clip_ratio/avg`**: Ratio of clipped tokens in PPO loss to total tokens. This is usually less than 0.1.
- **`ppo_actor/actor_loss/avg`**: PPO loss value. **This does not show clear trends during training** and should not be used as a performance indicator.
## Next Steps ## Next Steps
[Evaluate your model](eval.md) or check the [troubleshooting section](troubleshooting.md) if you encounter any issues. Check [Getting Started with AReaLite](../arealite/gsm8k_grpo.md) for a complete code
walkthrough on the GRPO GSM8K Example.
Customization guides:
- [Custom dataset](../customization/dataset.md)
- [Custom agentic/RVLR rollout workflows](../customization/agent.md)
- [Custom algorithms](../customization/algorithm.md)

View File

@ -1,101 +0,0 @@
# Quickstart
Welcome to the **AReaLite** Quickstart Guide!
This guide demonstrates how to run an AReaLite experiment training an LLM on the GSM8K dataset using the GRPO algorithm with function-based rewards.
Ensure you've completed [the installation and environment setup](installation.md) before proceeding.
## Running the Experiment (on a single node)
To run the experiment, you will need:
- Training script: [examples/arealite/gsm8k_grpo.py](../../examples/arealite/gsm8k_grpo.py)
- Config YAML: [examples/arealite/configs/gsm8k_grpo.yaml](../../examples/arealite/configs/gsm8k_grpo.yaml)
Our training scripts will automatically download the dataset (openai/gsm8k) and model (Qwen/Qwen2-1.5B-Instruct).
To run the example with default configuration, execute from the repository directory:
```
python3 -m arealite.launcher.local examples/arealite/gsm8k_grpo.py --config examples/arealite/configs/gsm8k_grpo.yaml experiment_name=<your experiment name> trial_name=<your trial name>
```
> **Note**: The command above uses `LocalLauncher`, which only works for a single node (`cluster.n_nodes == 1`). For distributed experiments, see [Distributed Experiments with Ray or Slurm](quickstart_arealite.md#distributed-experiments-with-ray-or-slurm).
## Modifying configuration
All available configuration options are listed in [arealite/api/cli_args.py](https://github.com/inclusionAI/AReaL/blob/main/arealite/api/cli_args.py).
To customize the experiment (models, resources, algorithm options), you can:
1. Edit the YAML file directly at [examples/arealite/configs/gsm8k_grpo.yaml](../../examples/arealite/configs/gsm8k_grpo.yaml).
2. Add command-line options:
- For existing options in the YAML file, directly add the option: `actor.path=Qwen/Qwen3-1.7B`.
- For other options in `cli_args.py`, but not in the YAML file, add with a prefix "+": `+sglang.attention_backend=triton`.
<!--
1. Adding command line options. For entries that exist in the config YAML, you could directly add the options after your command. For example: `actor.path=Qwen/Qwen3-1.7B`. For other options in `cli_args.py` but not in YAML, you could add these options with a prefix "+". For example: `+sglang.attention_backend=triton`.
-->
For example, here is the command to launch a customized configuration, based on our GSM8K GRPO example:
```
python3 -m arealite.launcher.local examples/arealite/gsm8k_grpo.py \
--config examples/arealite/configs/gsm8k_grpo.yaml \
experiment_name=<your experiment name> \
trial_name=<your trial name> \
allocation_mode=sglang.d2p1t1+d2p1t1 \
cluster.n_nodes=1 \
cluster.n_gpus_per_node=4 \
gconfig.max_new_tokens=2048 \
train_dataset.batch_size=1024 \
+sglang.attention_backend=triton
```
::::{important}
We're currently refactoring from legacy AReaL to AReaLite, which introduces some configuration differences. We provide a **config converter** to transfer old AReaL config into AReaLite YAML file for users' convenience. [Click here](xxx) to learn how to use the **config converter**.
::::
## Distributed Experiments with Ray or Slurm
AReaLite provides standalone launchers for distributed experiments. After setting up your Ray or Slurm cluster, launch experiments similarly to `LocalLauncher`:
```
# Launch with Ray launcher. 4 nodes (4 GPUs each), 3 nodes for generation, 1 node for training.
python3 -m arealite.launcher.ray examples/arealite/gsm8k_grpo.py \
--config examples/arealite/configs/gsm8k_grpo.yaml \
experiment_name=<your experiment name> \
trial_name=<your trial name> \
allocation_mode=sglang.d12p1t1+d4p1t1 \
cluster.n_nodes=4 \
cluster.n_gpus_per_node=4 \
...
# Launch with Slurm launcher. 16 nodes (8 GPUs each), 12 nodes for generation, 4 nodes for training
python3 -m arealite.launcher.slurm examples/arealite/gsm8k_grpo.py \
--config examples/arealite/configs/gsm8k_grpo.yaml \
experiment_name=<your experiment name> \
trial_name=<your trial name> \
allocation_mode=sglang.d96p1t1+d32p1t1 \
cluster.n_nodes=16 \
cluster.n_gpus_per_node=8 \
...
```
Additional references:
- For more options for launchers, check `LauncherConfig` in [arealite/api/cli_args.py](https://github.com/inclusionAI/AReaL/blob/main/arealite/api/cli_args.py).
- [Ray cluster setup guide](installation.md#optional-launch-ray-cluster-for-distributed-training) for a guide on how to set up a ray cluster.
> **Important Notes**:
> 1. Ensure `allocation_mode` matches your cluster configuration (`#GPUs == cluster.n_nodes * cluster.n_gpus_per_node`)
> 2. Ray/Slurm launchers only works for more than 1 node (`cluster.n_nodes > 1`). For single node scenario, please use `LocalLauncher`.
> 3. In Ray/Slurm launchers, GPUs are allocated at node granularity, which means #GPUs for generation or training must be integer multiples of `cluster.n_gpus_per_node`.
<!--
> **Notes**: Before launching distributed experiments, please check if your `allocation_mode` matches your cluster configuration. Make sure #GPUs allocated by `allocation_mode` equals to `cluster.n_nodes * cluster.n_gpus_per_node`.
> **Note**: Ray and Slurm launchers only work for distributed experiments with more than 1 node (`cluster.n_nodes > 1`). They allocate GPUs for training and generation at the granularity of **nodes**, which means the number of GPUs allocated for generation and training must be integer multiples of `cluster.n_gpus_per_node`.
-->
## Next Steps
<!--
1. Check [Getting Started with AReaLite](../arealite/gsm8k_grpo.md) for a complete code walkthrough on the GRPO GSM8K Example.
-->
Customization guides:
- [Custom dataset](../customization/dataset.md)
- [Custom agentic/RVLR rollout workflows](../customization/agent.md)
- [Custom algorithms](../customization/algorithm.md)

View File

@ -0,0 +1,169 @@
# Quickstart (Legacy)
> **Note**: This is a quickstart guide for launching AReaL experiment with legacy code
> in `realhf/`. We strongly recommend users to try AReaLite for better experiences.
> [Click here](quickstart.md) for AReaLite quickstart guide!
This guide walks you through a simple example of training an LLM to solve math problems.
Please ensure you have properly
[installed dependencies and set up the runtime environment](installation.md) before
proceeding.
## Dataset
Use `huggingface-cli` to download our open-source dataset:
```bash
huggingface-cli download --repo-type=dataset inclusionAI/AReaL-RL-Data
```
> **Note**: The command above will display the path of the downloaded dataset. You'll
> need to pass this path to the training command.
## Model
We train using open-source models available on Hugging Face Hub. You can either download
the model in advance or use the model identifier when running the experiment.
```bash
# If you want to download it in advance
huggingface-cli download Qwen/Qwen3-1.7B
```
Refer to the
[official documentation](https://huggingface.co/docs/huggingface_hub/guides/cli) for
more information on using `huggingface-cli`.
## Training
From the repository directory, run:
```bash
# examples/run_async_ppo.sh
python3 training/main_async_ppo.py \
n_nodes=1 n_gpus_per_node=8 \
allocation_mode=sglang.d4p1m1+d2p2m1 \
cluster.fileroot=/path/to/save/logs/checkpoints/ \
actor.type._class=qwen3 \
actor.path=Qwen/Qwen3-1.7B \
ref.type._class=qwen3 \
ref.path=Qwen/Qwen3-1.7B \
dataset.path=/path/to/boba_106k_0319.jsonl \
dataset.train_bs_n_seqs=32 \
group_size=8 \
ppo.gen.max_new_tokens=4096 \
ppo.ppo_n_minibatches=4 \
actor_train.mb_spec.max_tokens_per_mb=32768 \
actor_inf.mb_spec.max_tokens_per_mb=32768 \
max_concurrent_rollouts=16 \
max_head_offpolicyness=4
```
::::{important} Running `main_async_ppo.py` with `ppo.recompute_logprob=False`,
`ppo.use_decoupled_loss=False`, and `max_head_offpolicyness=0` will essentially
replicate the behavior of synchronous PPO. Therefore, it's usually not recommended to
run synchronous PPO directly (i.e., `main_sync_ppo.py`). The workflow of asynchronous RL
is more stable and easier to customize. ::::
## Command Line Options
To view all available options:
```bash
python3 training/main_sync_ppo.py --help
```
### Configuration Parameters
- **`experiment_name`**: The name of your project.
- **`trial_name`**: The name of this trial in your project.
- **`{actor|ref}.path`**: The path to the model files.
- **`dataset.path`**: The path to the dataset JSONL file.
- **`cluster.fileroot`**: The root path for saving training outputs (logs and
checkpoints).
- **`n_nodes`**: The number of nodes in the cluster.
- **`n_gpus_per_node`**: The number of GPUs per node.
- **`allocation_mode`**: The GPU allocation strategy and 3D parallelism configuration
for the experiment. Format:
- `sglang.d${DP1}m${TP1}p${PP1}+d${DP2}m${TP2}p${PP2}`: Configures parallel strategies
for SGLang generation and training respectively. Generation and training use
separate GPU sets, and the total GPU count must equal: DP1×TP1×PP1 + DP2×TP2×PP2 =
#GPUs.
### Training Control
- **`exp_ctrl.total_train_epochs`**: Number of training epochs (complete dataset
iterations).
- **`exp_ctrl.save_freq_{epochs|steps|secs}`**: Frequency for saving model parameters to
persistent storage. Set to null to disable saving.
- **`exp_ctrl.ckpt_freq_{epochs|steps|secs}`**: Frequency for saving temporary
parameters for restart capability.
- **`dataset.train_bs_n_seqs`**: Training batch size (number of prompts sampled per
training iteration).
- **`group_size`**: Number of responses sampled per prompt.
### Memory and Performance
- **`{actor_train|ref_inf|actor_inf}.mb_spec.max_tokens_per_mb`**: Maximum tokens per
mini-batch for forward/backward passes during reference model inference and actor
model training. Reduce this value to avoid OOM errors.
- **`max_concurrent_rollouts`**: The maximum number of concurrent rollouts. SGLang will
run out of memory if this value is too large. Defaults to `dataset.train_bs_n_seqs`.
### Algorithm Configuration
- **`max_head_offpolicyness`**: The allowed maximum data staleness. 0 recovers
synchronous training. A large value will increase generation throughput but degrade
final performance. We recommend keeping this value at 8 or below.
- **`ppo.recompute_logprob`**: Whether to compute proximal log probabilities for
training. Defaults to True for asynchronous experiments and False for synchronous
baselines.
- **`ppo.use_decoupled_loss`**: Use decoupled loss to stabilize asynchronous training.
Defaults to True.
- **`ppo.gen.max_new_tokens`**: Maximum tokens to generate per prompt.
- **`ppo.ppo_n_minibatches`**: Number of mini-batches for dividing data during each PPO
update.
- **`success_rate_ub`**: Upper bound of success rate. Prompts with a higher success rate
will be filtered out.
- **`success_rate_lb`**: Lower bound of success rate. Prompts with a lower success rate
will be filtered out.
## Monitoring the Training Process
- We recommend using [Weights & Biases (wandb)](https://github.com/wandb/wandb) or
[SwanLab](https://github.com/SwanHubX/SwanLab) for monitoring—run `wandb login` or
`swanlab login`, or set the corresponding environment variable API key
(`WANDB_API_KEY` or `SWANLAB_API_KEY`). Set `wandb.mode="online"` or
`swanlab.mode="cloud"` in your configuration to upload training statistics. If you
cannot connect to the server, you can also use `wandb.mode="offline"` or
`swanlab.mode="local"` to save data locally without uploading.
You can also use TensorBoard by setting the `tensorboard.path` parameter.
The main log will be saved to
`${fileroot}/logs/${USER}/${experiment_name}/${trial_name}/main.log` and contains the
statistics uploaded to wandb.
If SwanLab is enabled, logs will be saved to the directory specified by
`swanlab.logdir`.
### Key Training Statistics
- **`Epoch 1/5`**: Indicates the total epochs required and the current epoch being
trained.
- **`step 6/19`**: Shows that the current epoch has 19 steps, with the 6th step just
completed.
- **`global step 6`**: Step count across all epochs.
- **`ppo_actor/task_reward/avg`**: Average reward value of all sampled responses in this
step. This should steadily increase during training and eventually stabilize.
- **`ppo_actor/importance_weight/avg`**: Average importance sampling ratio across all
tokens in the PPO loss. This is typically close to 1.0.
- **`ppo_actor/actor_clip_ratio/avg`**: Ratio of clipped tokens in PPO loss to total
tokens. This is usually less than 0.1.
- **`ppo_actor/actor_loss/avg`**: PPO loss value. **This does not show clear trends
during training** and should not be used as a performance indicator.
## Next Steps
[Evaluate your model](eval.md) or check the
[troubleshooting section](troubleshooting.md) if you encounter any issues.

View File

@ -11,6 +11,7 @@ from typing import Any, List
import psutil import psutil
import ray import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from realhf.api.cli_args import NameResolveConfig from realhf.api.cli_args import NameResolveConfig
from realhf.api.core.system_api import Experiment, ExperimentScheduling, TasksGroup from realhf.api.core.system_api import Experiment, ExperimentScheduling, TasksGroup
@ -167,6 +168,19 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
scheduling: ExperimentScheduling = exp_cfg.scheduling_setup() scheduling: ExperimentScheduling = exp_cfg.scheduling_setup()
# We assume all nodes have the same resources (CPU, GPU, memory).
all_available_resources = ray.available_resources()
all_available_nodes = [
k
for k in all_available_resources
if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k)
]
n_nodes = len(all_available_nodes)
n_gpus_per_node = int(all_available_resources["GPU"] // n_nodes)
assert (
all_available_resources["GPU"] % n_nodes == 0
), "AReaL assumes all nodes has the same number of GPUs."
for worker_type in WORKER_TYPES: for worker_type in WORKER_TYPES:
sch = getattr(scheduling, worker_type) sch = getattr(scheduling, worker_type)
if sch is None: if sch is None:
@ -187,34 +201,73 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
f"Please launch more Ray nodes otherwise the experiment will get stuck." f"Please launch more Ray nodes otherwise the experiment will get stuck."
) )
# Use a customized packed scheduling method
# that sequentially allocates nodes.
available_nodes = [
k
for k in available_resources
if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k)
]
total_gpus = available_resources["GPU"]
n_gpus_per_node = int(total_gpus // len(available_nodes))
count = sch.count
all_schedules: List[TasksGroup] = []
for _ in range(sch.count):
s_ = copy.deepcopy(sch)
s_.count = 1
all_schedules.append(s_)
workers = [] workers = []
if sch.scheduling.gpu > 0 and n_nodes > 1:
# When # nodes > 1, for GPU workers, schedule them in granularity of nodes.
assert (
n_gpus_per_node % sch.scheduling.gpu == 0
), f"Each node should be allocated with identical numbers of {worker_type}."
n_worker_per_node = int(n_gpus_per_node / sch.scheduling.gpu)
assert sch.count % n_worker_per_node == 0, (
f"Total {worker_type} count ({sch.count}) should be divisible by "
f"the number of workers per node ({n_worker_per_node})."
)
n_nodes = int(sch.count / n_worker_per_node)
placement_group = ray.util.placement_group(
bundles=[
{
"CPU": sch.scheduling.cpu * n_worker_per_node,
"GPU": sch.scheduling.gpu * n_worker_per_node,
"memory": sch.scheduling.mem
* 1024**2
* n_worker_per_node, # in bytes
}
]
* n_nodes,
)
try:
ray.get(placement_group.ready(), timeout=30)
except ray.exceptions.GetTimeoutError:
logger.critical(
f"Failed to create placement group for {worker_type}s. "
f"Please make sure at least {n_nodes} node "
f"has resources for {n_worker_per_node} {worker_type}s."
)
for node_idx, i in enumerate(range(0, count, n_gpus_per_node)): for node_id in range(n_nodes):
_schedules = all_schedules[i : i + n_gpus_per_node] # Use a customized packed scheduling method
for _idx, sch in enumerate(_schedules): # that sequentially allocates nodes.
# Schedule jobs one-by-one to maintain the order on remote nodes. for i in range(n_worker_per_node):
_idx = node_id * n_worker_per_node + i
worker = RayWorker.options(
name=f"{worker_type}/{_idx}",
num_cpus=sch.scheduling.cpu,
num_gpus=sch.scheduling.gpu,
memory=sch.scheduling.mem * 1024**2,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_bundle_index=node_id,
placement_group_capture_child_tasks=True,
),
).remote(
args=exp_cfg,
worker_type=worker_type,
worker_cls=load_worker(worker_type),
kv_store_name=ray_kv_store_name,
)
workers.append(worker)
else:
# Schedule them with SPREAD strategy when
# 1. CPU workers when n_nodes > 1,
# to save as much resource as possible on nodes for GPU workers.
# 2. all workers when n_nodes = 1
for _idx in range(sch.count):
worker = RayWorker.options( worker = RayWorker.options(
name=f"{worker_type}/{_idx + i}", name=f"{worker_type}/{_idx}",
num_cpus=sch.scheduling.cpu, num_cpus=sch.scheduling.cpu,
num_gpus=sch.scheduling.gpu, num_gpus=sch.scheduling.gpu,
memory=sch.scheduling.mem * 1024**2, memory=sch.scheduling.mem * 1024**2,
scheduling_strategy="SPREAD",
).remote( ).remote(
args=exp_cfg, args=exp_cfg,
worker_type=worker_type, worker_type=worker_type,
@ -222,7 +275,6 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
kv_store_name=ray_kv_store_name, kv_store_name=ray_kv_store_name,
) )
workers.append(worker) workers.append(worker)
all_workers[worker_type] = workers all_workers[worker_type] = workers
try: try: