Merge branch 'main' into lite

This commit is contained in:
Wei Fu 2025-07-24 19:24:38 +08:00 committed by GitHub
commit aa6c28ed24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 79 additions and 25 deletions

View File

@ -1,5 +1,5 @@
<h1 align="center">
<em>AReaL</em> v0.2: Training a SOTA 7B LRM with 1.5x Throughput Improvment
<em>AReaL</em> v0.2: Training a SOTA 7B LRM with 1.5x Throughput Improvement
</h1>
<p align="center" style="font-size: 0.8em; color: #666;">

View File

@ -1,4 +1,6 @@
# AReaL v0.3
<h1 align="center">
<em>AReaL</em> v0.3: SOTA Coding Models with 2.77x Faster Asynchronous RL Training
</h1>
## Introduction

View File

@ -11,6 +11,7 @@ from typing import Any, List
import psutil
import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from realhf.api.cli_args import NameResolveConfig
from realhf.api.core.system_api import Experiment, ExperimentScheduling, TasksGroup
@ -167,6 +168,19 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
scheduling: ExperimentScheduling = exp_cfg.scheduling_setup()
# We assume all nodes have the same resources (CPU, GPU, memory).
all_available_resources = ray.available_resources()
all_available_nodes = [
k
for k in all_available_resources
if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k)
]
n_nodes = len(all_available_nodes)
n_gpus_per_node = int(all_available_resources["GPU"] // n_nodes)
assert (
all_available_resources["GPU"] % n_nodes == 0
), "AReaL assumes all nodes has the same number of GPUs."
for worker_type in WORKER_TYPES:
sch = getattr(scheduling, worker_type)
if sch is None:
@ -187,34 +201,73 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
f"Please launch more Ray nodes otherwise the experiment will get stuck."
)
# Use a customized packed scheduling method
# that sequentially allocates nodes.
available_nodes = [
k
for k in available_resources
if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k)
]
total_gpus = available_resources["GPU"]
n_gpus_per_node = int(total_gpus // len(available_nodes))
count = sch.count
all_schedules: List[TasksGroup] = []
for _ in range(sch.count):
s_ = copy.deepcopy(sch)
s_.count = 1
all_schedules.append(s_)
workers = []
if sch.scheduling.gpu > 0 and n_nodes > 1:
# When # nodes > 1, for GPU workers, schedule them in granularity of nodes.
assert (
n_gpus_per_node % sch.scheduling.gpu == 0
), f"Each node should be allocated with identical numbers of {worker_type}."
n_worker_per_node = int(n_gpus_per_node / sch.scheduling.gpu)
assert sch.count % n_worker_per_node == 0, (
f"Total {worker_type} count ({sch.count}) should be divisible by "
f"the number of workers per node ({n_worker_per_node})."
)
n_nodes = int(sch.count / n_worker_per_node)
placement_group = ray.util.placement_group(
bundles=[
{
"CPU": sch.scheduling.cpu * n_worker_per_node,
"GPU": sch.scheduling.gpu * n_worker_per_node,
"memory": sch.scheduling.mem
* 1024**2
* n_worker_per_node, # in bytes
}
]
* n_nodes,
)
try:
ray.get(placement_group.ready(), timeout=30)
except ray.exceptions.GetTimeoutError:
logger.critical(
f"Failed to create placement group for {worker_type}s. "
f"Please make sure at least {n_nodes} node "
f"has resources for {n_worker_per_node} {worker_type}s."
)
for node_idx, i in enumerate(range(0, count, n_gpus_per_node)):
_schedules = all_schedules[i : i + n_gpus_per_node]
for _idx, sch in enumerate(_schedules):
# Schedule jobs one-by-one to maintain the order on remote nodes.
for node_id in range(n_nodes):
# Use a customized packed scheduling method
# that sequentially allocates nodes.
for i in range(n_worker_per_node):
_idx = node_id * n_worker_per_node + i
worker = RayWorker.options(
name=f"{worker_type}/{_idx}",
num_cpus=sch.scheduling.cpu,
num_gpus=sch.scheduling.gpu,
memory=sch.scheduling.mem * 1024**2,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_bundle_index=node_id,
placement_group_capture_child_tasks=True,
),
).remote(
args=exp_cfg,
worker_type=worker_type,
worker_cls=load_worker(worker_type),
kv_store_name=ray_kv_store_name,
)
workers.append(worker)
else:
# Schedule them with SPREAD strategy when
# 1. CPU workers when n_nodes > 1,
# to save as much resource as possible on nodes for GPU workers.
# 2. all workers when n_nodes = 1
for _idx in range(sch.count):
worker = RayWorker.options(
name=f"{worker_type}/{_idx + i}",
name=f"{worker_type}/{_idx}",
num_cpus=sch.scheduling.cpu,
num_gpus=sch.scheduling.gpu,
memory=sch.scheduling.mem * 1024**2,
scheduling_strategy="SPREAD",
).remote(
args=exp_cfg,
worker_type=worker_type,
@ -222,7 +275,6 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
kv_store_name=ray_kv_store_name,
)
workers.append(worker)
all_workers[worker_type] = workers
try: