add async doc (#67)

This commit is contained in:
Wei Fu 2025-06-02 15:59:50 +08:00 committed by GitHub
parent ac7a9c5a89
commit ab20f940f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 125 additions and 28 deletions

View File

@ -14,13 +14,19 @@ parts:
chapters:
- file: customization/dataset
- file: customization/agent
- caption: Developer Manual
- caption: Code Walkthrough
chapters:
- file: developer/exp_launch
- file: developer/master_worker
- file: developer/model_worker
- file: developer/algo_interface
- file: developer/allocation_parallel
- file: developer/overview
- file: developer/trainer
sections:
- file: developer/trainer/master_worker
- file: developer/trainer/model_worker
- file: developer/trainer/algo_interface
- file: developer/trainer/allocation_parallel
- file: developer/rollout
sections:
- file: developer/rollout/gserver
- file: developer/rollout/rollout_worker
- caption: References
chapters:
- file: references/benchmark

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

View File

@ -1,3 +0,0 @@
# Launching Procedure
![Illustration of Experiment Launching](launch.png)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 130 KiB

View File

@ -0,0 +1,15 @@
# Overview
![overview](areal_code_arch.png)
In the launching script, the CLI arguments are first converted into an experiment configuration of type `AsyncPPOMathConfig`. We then call the `initial_setup` method of the experiment configuration to obtain the worker configurations required for the experiment. Next, we launch several types of workers.
Among these workers, `GenerationServer`, `RolloutWorker`, and `GserverManager` are responsible for rollout in asynchronous RL. `ModelWorker` and `MasterWorker` are responsible for training.
Workers independently execute the `_poll` or `_poll_async` method in a while loop, where the core logic of these workers is implemented.
```{note}
For SFT and synchronous PPO, only trainer workers are launched.
```
For asynchronous RL, the trainer side treats the rollout side as "datasets". The "dataset" does not load from disk but pulls data from a TCP socket, implemented in `stream_dataset.py`. This approach unifies offline training workflows (e.g., SFT) and online RL workflows (e.g., PPO) by simply specifying different dataset types.

View File

@ -0,0 +1,5 @@
# Rollout
[Generation Server and Manager](rollout/gserver.md)
[Rollout Worker](rollout/rollout_worker.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 130 KiB

View File

@ -0,0 +1,25 @@
# Generation Server and Manager
## Server
The `GenerationServer` launches an SGLang subprocess during setup and writes the server's address to `name_resolve` so that other workers can discover it. It serves as a thin wrapper over SGLang's `launch_server` command.
Request scheduling and parameter versioning across different servers are managed by the `GserverManager`, which we'll introduce in the next section.
## Manager
![manager](areal_rollout_manager.png)
### Server Thread
The `GserverManager` launches a `fastapi` server thread for rollout workers to invoke. There are three main types of APIs:
+ **allocate_request**: The manager decides whether to allow submission of a new rollout request based on capacity and data staleness
+ **schedule_request**: The manager routes initial or interrupted requests to a generation server address
+ **finish_rollout**: The manager releases the slot and allows allocation of new rollouts
Since the generation of each trajectory must pass through the centralized manager, we can conveniently control data staleness. For details on staleness control, refer to the `is_staled` method of `GserverManager`.
### Main Thread
The main thread of `GserverManager` executes the `_poll` method, which primarily checks whether new weights are available. When new weights are detected, it sends update weight requests to all generation servers to interrupt ongoing requests and update the weights to the latest version.

View File

@ -0,0 +1,19 @@
# Rollout Worker
![rollout_worker](rollout_worker.png)
The rollout worker runs these jobs in independent asyncio coroutines:
+ **Load and execute trajectories**: Load a new prompt and run `agent.collect_trajectory()`. The agent steps through the environment, pushes new observations into `obs_queue`, and waits for new actions from `act_queue`. The user can provide customized implementation for the agent.
+ **Process inference requests**: Handle requests produced by agents and obtained from `obs_queue`. After inference completes, it sends actions via `act_queue` back to the agent to continue the rollout.
+ **Submit completed trajectories**: Push complete trajectories to the trainer.
For each agent, the timeline is straightforward: it iteratively puts the latest observation into `obs_queue`, waits for an action from `act_queue`, and runs `env.step` to get the next observation until a full trajectory is collected. The rollout worker runs many agents concurrently, switching between them when some are idle and waiting for remote responses.
The rollout worker typically doesn't perform actual computation—it serves as a client for both the generation server and the environment service.
```{note}
Although we expect `EnvironmentService` to be a remote service, it can be implemented as a pure python function. In this case, the rollout worker process will handle the computation for `env.step`.
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 105 KiB

View File

@ -0,0 +1,9 @@
# Trainer
[Master Worker](trainer/master_worker.md)
[Model Worker](trainer/model_worker.md)
[Algorithm Interface](trainer/algo_interface.md)
[Allocation & Parallelism](trainer/allocation_parallel.md)

View File

Before

Width:  |  Height:  |  Size: 338 KiB

After

Width:  |  Height:  |  Size: 338 KiB

View File

Before

Width:  |  Height:  |  Size: 162 KiB

After

Width:  |  Height:  |  Size: 162 KiB

View File

Before

Width:  |  Height:  |  Size: 389 KiB

After

Width:  |  Height:  |  Size: 389 KiB

View File

Before

Width:  |  Height:  |  Size: 260 KiB

After

Width:  |  Height:  |  Size: 260 KiB

View File

@ -3,9 +3,9 @@
## Overview
![](master_arch.png)
The worker architecture of AReaL consists of a single master worker coordinating multiple model workers.
The trainer part of AReaL consists of a single master worker coordinating multiple model workers.
An RL algorithm typically contains several model function calls (MFCs) that need to be executed in a certain order. For example in PPO,
An RL algorithm typically contains several model function calls (MFCs) that need to be executed in a certain order. For example in (synchronous) PPO,
1. `actor_gen` generates responses given a batch of user prompts;
2. `ref_inf` computes the log-probabilities of the tokens under the reference policy;
@ -14,6 +14,12 @@ An RL algorithm typically contains several model function calls (MFCs) that need
Here model function calls 2 and 3 depends on the output of 1. Model function call 4 depends on the outputs of 1, 2, and 3.
```{note}
The `actor_gen` and `rew_inf` MFC are eliminated for asynchronous RL and executed on the rollout side in a streaming manner.
The master worker will only see consequent MFCs. The results of previous MFCs will be loaded from the `StreamDataset`.
```
The MFCs are coordinated by a `FunctionExecutor` instance. It creates a `ModelFunctionCall` instance for each MFC. The actual computation is performed on model workers via remote procedure call.
## Buffer and MFC Execution Order

View File

@ -1,29 +1,38 @@
# Model Worker
## Master-Model Worker Interaction
The master worker sends remote procedure calls (RPCs) to model workers to execute actual computations like `actor_gen` and `actor_train`. The figure below illustrates their interaction throughout an experiment:
The master worker sends remote procedure calls (RPCs) to model workers to execute actual computations like `actor_train`. The figure below illustrates their interaction throughout an experiment:
![](master-model-interaction.png)
Model worker "compute" involves running a model interface with a specific backend (covered in detail later). For PPO algorithms, model workers sequentially execute:
Model worker "compute" involves running a model interface with a specific backend (covered in detail later). For synchronous PPO algorithms, model workers sequentially execute:
+ `actor_gen`: `actor` model with SGlang backend + `PPOActorInterface.generate`
+ `actor_gen`: `actor` model with SGLang backend + `PPOActorInterface.generate`
+ `rew_inf`: `reward` model (can be null for RLVR) + `MultiTaskRewardInterface.inference`
+ `actor_train`: `actor` model with Megatron backend + `PPOActorInterface.train_step`
## Communication Protocol
### Request-Reply Pattern
The master worker and model workers communicate through a `request_reply_stream` channel that handles requests and metadata responses (actual data like `input_ids` transfers through other channels).
```{note}
For asynchronous PPO, only `actor_train` is executed.
```
Master (client) can send these requests to model workers (servers):
## Communication Protocol
### Request-Reply Pattern
The master worker and model workers communicate through a `request_reply_stream` channel that handles requests and metadata responses. Actual data like `input_ids` transfers through other channels.
The master (client) can send these requests to model workers (servers):
+ **fetch**: Worker loads local dataset data and sends metadata (e.g., sequence length) to master for buffer storage
+ **spec**: Worker returns dataset specifications for master to calculate experiment steps
+ **model_config**: Worker provides transformer model configuration
+ **clear_data_cache**: Worker clears data transfer and GPU caches
+ **initialize**: Worker initializes parameters, gradient buffers, and optimizer states
+ **generate/inference/train_step**: Worker executes corresponding computation (note: "inference" refers to single forward pass)
+ **generate/inference/train_step**: Worker executes corresponding computation (note: "inference" refers to a single forward pass)
### Request Hooks
Computation requests ("generate"/"inference"/"train_step") support pre- and post-hooks for:
+ Data transfer (pre-hook)
@ -32,18 +41,21 @@ Computation requests ("generate"/"inference"/"train_step") support pre- and post
+ Parameter reallocation
+ Checkpointing (post-hooks)
These hooks often require NCCL communication/synchronization between workers. Implementing them as dedicated hooks prevents deadlocks that could occur if these operations interleaved with other NCCL communications.
These hooks often require NCCL communication and synchronization between workers. Implementing them as dedicated hooks prevents deadlocks that could occur if these operations were interleaved with other NCCL communications.
### Request Types
+ **Blocking requests**: Long-running operations requiring NCCL synchronization. Workers can't execute immediately since concurrent blocking requests may need coordinated data transfers. Master sends a "flush" request to indicate all concurrent requests have been sent.
+ **Blocking requests**: Long-running operations requiring NCCL synchronization. Workers can't execute immediately since concurrent blocking requests may need coordinated data transfers. The master sends a "flush" request to indicate that all concurrent requests have been sent.
+ **Non-blocking requests**: Shorter operations without NCCL requirements that can execute immediately.
## Data Management
### Distributed Dataset Storage
Datasets distribute across model workers without overlap. For each model:
Datasets are distributed across model workers without overlap. For each model:
+ Processes with PP rank = -1 and TP rank = 0 serve as DP heads
+ Data stores on DP heads of the model used in the first MFC (e.g., actor model DP heads for PPO)
+ Data is stored on DP heads of the model used in the first MFC (e.g., actor model DP heads for PPO)
During "fetch" requests:
@ -51,7 +63,12 @@ During "fetch" requests:
2. Sends metadata to master
3. Master tracks metadata and later instructs workers which data to use for each MFC via computation request hooks
```{note}
For asynchronous RL, the "dataset" will be a `StreamDataset` instance that pulls data from the rollout worker. After data is loaded, the subsequent MFC calls follow the same procedure as described above.
```
### Data Transfer Process
For each MFC, the master:
1. Specifies which data to use
@ -60,14 +77,12 @@ For each MFC, the master:
- `Redistributor`: Generates NCCL broadcast/gather/scatter communication plan
- `DataManager`: Executes the plan
After redistribution, workers with same DP rank receive identical input data.
After redistribution, workers with the same DP rank receive identical input data.
### MFC Output Handling
Only workers with PP rank=-1 and TP rank=0 produce output data. These workers:
1. Store data locally
2. Notify master of data locations
3. Master generates new redistribution plans for subsequent MFCs based on this layout information
3. Master generates new redistribution plans for subsequent MFCs based on this layout information

View File

Before

Width:  |  Height:  |  Size: 24 KiB

After

Width:  |  Height:  |  Size: 24 KiB