From ef8b340ea78b27942b533ad8994e2c6d5c76b437 Mon Sep 17 00:00:00 2001 From: "Richard.Bian" Date: Thu, 17 Jul 2025 16:29:34 +0800 Subject: [PATCH 1/2] nit: fix typo in blog v0.2 release (#182) Nitpick fix for the title in the blog md file AReaL_v0_2.md --- blog/AReaL_v0_2.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blog/AReaL_v0_2.md b/blog/AReaL_v0_2.md index aa03561..f6d2cea 100644 --- a/blog/AReaL_v0_2.md +++ b/blog/AReaL_v0_2.md @@ -1,5 +1,5 @@

-AReaL v0.2: Training a SOTA 7B LRM with 1.5x Throughput Improvment +AReaL v0.2: Training a SOTA 7B LRM with 1.5x Throughput Improvement

From 0d45f43285c7d942d80cddc3aa3f39bb1621bd67 Mon Sep 17 00:00:00 2001 From: nuzant Date: Thu, 17 Jul 2025 16:30:10 +0800 Subject: [PATCH 2/2] [Bug] Fix a bug in ray training utils. (#184) * fix ray schedule * . * . * remove debug info --- training/utils.py | 95 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 23 deletions(-) diff --git a/training/utils.py b/training/utils.py index e5d409b..90a6d61 100644 --- a/training/utils.py +++ b/training/utils.py @@ -11,6 +11,7 @@ from typing import Any, List import psutil import ray +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from realhf.api.cli_args import NameResolveConfig from realhf.api.core.system_api import Experiment, ExperimentScheduling, TasksGroup @@ -167,6 +168,18 @@ def _run_experiment(exp_cfg, expr_name, trial_name): scheduling: ExperimentScheduling = exp_cfg.scheduling_setup() + # We assume all nodes have the same resources (CPU, GPU, memory). + all_available_resources = ray.available_resources() + all_available_nodes = [ + k + for k in all_available_resources + if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k) + ] + n_gpus_per_node = int(all_available_resources["GPU"] // len(all_available_nodes)) + assert ( + all_available_resources["GPU"] % len(all_available_nodes) == 0 + ), "AReaL assumes all nodes has the same number of GPUs." + for worker_type in WORKER_TYPES: sch = getattr(scheduling, worker_type) if sch is None: @@ -187,34 +200,71 @@ def _run_experiment(exp_cfg, expr_name, trial_name): f"Please launch more Ray nodes otherwise the experiment will get stuck." ) - # Use a customized packed scheduling method - # that sequentially allocates nodes. - available_nodes = [ - k - for k in available_resources - if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k) - ] - total_gpus = available_resources["GPU"] - n_gpus_per_node = int(total_gpus // len(available_nodes)) - - count = sch.count - all_schedules: List[TasksGroup] = [] - for _ in range(sch.count): - s_ = copy.deepcopy(sch) - s_.count = 1 - all_schedules.append(s_) - workers = [] + if sch.scheduling.gpu > 0: + # For GPU workers, schedule them in granularity of nodes. + assert ( + n_gpus_per_node % sch.scheduling.gpu == 0 + ), f"Each node should be allocated with identical numbers of {worker_type}." + n_worker_per_node = int(n_gpus_per_node / sch.scheduling.gpu) + assert sch.count % n_worker_per_node == 0, ( + f"Total {worker_type} count ({sch.count}) should be divisible by " + f"the number of workers per node ({n_worker_per_node})." + ) + n_nodes = int(sch.count / n_worker_per_node) + placement_group = ray.util.placement_group( + bundles=[ + { + "CPU": sch.scheduling.cpu * n_worker_per_node, + "GPU": sch.scheduling.gpu * n_worker_per_node, + "memory": sch.scheduling.mem + * 1024**2 + * n_worker_per_node, # in bytes + } + ] + * n_nodes, + ) + try: + ray.get(placement_group.ready(), timeout=30) + except ray.exceptions.GetTimeoutError: + logger.critical( + f"Failed to create placement group for {worker_type}s. " + f"Please make sure at least {n_nodes} node " + f"has resources for {n_worker_per_node} {worker_type}s." + ) - for node_idx, i in enumerate(range(0, count, n_gpus_per_node)): - _schedules = all_schedules[i : i + n_gpus_per_node] - for _idx, sch in enumerate(_schedules): - # Schedule jobs one-by-one to maintain the order on remote nodes. + for node_id in range(n_nodes): + # Use a customized packed scheduling method + # that sequentially allocates nodes. + for i in range(n_worker_per_node): + _idx = node_id * n_worker_per_node + i + worker = RayWorker.options( + name=f"{worker_type}/{_idx}", + num_cpus=sch.scheduling.cpu, + num_gpus=sch.scheduling.gpu, + memory=sch.scheduling.mem * 1024**2, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=placement_group, + placement_group_bundle_index=node_id, + placement_group_capture_child_tasks=True, + ), + ).remote( + args=exp_cfg, + worker_type=worker_type, + worker_cls=load_worker(worker_type), + kv_store_name=ray_kv_store_name, + ) + workers.append(worker) + else: + # For CPU workers, schedule them with SPREAD strategy + # to save as much resource as poosible on nodes for GPU workers. + for _idx in range(sch.count): worker = RayWorker.options( - name=f"{worker_type}/{_idx + i}", + name=f"{worker_type}/{_idx}", num_cpus=sch.scheduling.cpu, num_gpus=sch.scheduling.gpu, memory=sch.scheduling.mem * 1024**2, + scheduling_strategy="SPREAD", ).remote( args=exp_cfg, worker_type=worker_type, @@ -222,7 +272,6 @@ def _run_experiment(exp_cfg, expr_name, trial_name): kv_store_name=ray_kv_store_name, ) workers.append(worker) - all_workers[worker_type] = workers try: