mirror of https://github.com/inclusionAI/AReaL
[Bug] Fix a bug in ray training utils. (#184)
* fix ray schedule * . * . * remove debug info
This commit is contained in:
parent
ef8b340ea7
commit
0d45f43285
|
@ -11,6 +11,7 @@ from typing import Any, List
|
|||
|
||||
import psutil
|
||||
import ray
|
||||
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
|
||||
|
||||
from realhf.api.cli_args import NameResolveConfig
|
||||
from realhf.api.core.system_api import Experiment, ExperimentScheduling, TasksGroup
|
||||
|
@ -167,6 +168,18 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
|
|||
|
||||
scheduling: ExperimentScheduling = exp_cfg.scheduling_setup()
|
||||
|
||||
# We assume all nodes have the same resources (CPU, GPU, memory).
|
||||
all_available_resources = ray.available_resources()
|
||||
all_available_nodes = [
|
||||
k
|
||||
for k in all_available_resources
|
||||
if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k)
|
||||
]
|
||||
n_gpus_per_node = int(all_available_resources["GPU"] // len(all_available_nodes))
|
||||
assert (
|
||||
all_available_resources["GPU"] % len(all_available_nodes) == 0
|
||||
), "AReaL assumes all nodes has the same number of GPUs."
|
||||
|
||||
for worker_type in WORKER_TYPES:
|
||||
sch = getattr(scheduling, worker_type)
|
||||
if sch is None:
|
||||
|
@ -187,34 +200,71 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
|
|||
f"Please launch more Ray nodes otherwise the experiment will get stuck."
|
||||
)
|
||||
|
||||
# Use a customized packed scheduling method
|
||||
# that sequentially allocates nodes.
|
||||
available_nodes = [
|
||||
k
|
||||
for k in available_resources
|
||||
if re.match(r"node:(\b(?:\d{1,3}\.){3}\d{1,3}\b)", k)
|
||||
]
|
||||
total_gpus = available_resources["GPU"]
|
||||
n_gpus_per_node = int(total_gpus // len(available_nodes))
|
||||
|
||||
count = sch.count
|
||||
all_schedules: List[TasksGroup] = []
|
||||
for _ in range(sch.count):
|
||||
s_ = copy.deepcopy(sch)
|
||||
s_.count = 1
|
||||
all_schedules.append(s_)
|
||||
|
||||
workers = []
|
||||
if sch.scheduling.gpu > 0:
|
||||
# For GPU workers, schedule them in granularity of nodes.
|
||||
assert (
|
||||
n_gpus_per_node % sch.scheduling.gpu == 0
|
||||
), f"Each node should be allocated with identical numbers of {worker_type}."
|
||||
n_worker_per_node = int(n_gpus_per_node / sch.scheduling.gpu)
|
||||
assert sch.count % n_worker_per_node == 0, (
|
||||
f"Total {worker_type} count ({sch.count}) should be divisible by "
|
||||
f"the number of workers per node ({n_worker_per_node})."
|
||||
)
|
||||
n_nodes = int(sch.count / n_worker_per_node)
|
||||
placement_group = ray.util.placement_group(
|
||||
bundles=[
|
||||
{
|
||||
"CPU": sch.scheduling.cpu * n_worker_per_node,
|
||||
"GPU": sch.scheduling.gpu * n_worker_per_node,
|
||||
"memory": sch.scheduling.mem
|
||||
* 1024**2
|
||||
* n_worker_per_node, # in bytes
|
||||
}
|
||||
]
|
||||
* n_nodes,
|
||||
)
|
||||
try:
|
||||
ray.get(placement_group.ready(), timeout=30)
|
||||
except ray.exceptions.GetTimeoutError:
|
||||
logger.critical(
|
||||
f"Failed to create placement group for {worker_type}s. "
|
||||
f"Please make sure at least {n_nodes} node "
|
||||
f"has resources for {n_worker_per_node} {worker_type}s."
|
||||
)
|
||||
|
||||
for node_idx, i in enumerate(range(0, count, n_gpus_per_node)):
|
||||
_schedules = all_schedules[i : i + n_gpus_per_node]
|
||||
for _idx, sch in enumerate(_schedules):
|
||||
# Schedule jobs one-by-one to maintain the order on remote nodes.
|
||||
for node_id in range(n_nodes):
|
||||
# Use a customized packed scheduling method
|
||||
# that sequentially allocates nodes.
|
||||
for i in range(n_worker_per_node):
|
||||
_idx = node_id * n_worker_per_node + i
|
||||
worker = RayWorker.options(
|
||||
name=f"{worker_type}/{_idx}",
|
||||
num_cpus=sch.scheduling.cpu,
|
||||
num_gpus=sch.scheduling.gpu,
|
||||
memory=sch.scheduling.mem * 1024**2,
|
||||
scheduling_strategy=PlacementGroupSchedulingStrategy(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=node_id,
|
||||
placement_group_capture_child_tasks=True,
|
||||
),
|
||||
).remote(
|
||||
args=exp_cfg,
|
||||
worker_type=worker_type,
|
||||
worker_cls=load_worker(worker_type),
|
||||
kv_store_name=ray_kv_store_name,
|
||||
)
|
||||
workers.append(worker)
|
||||
else:
|
||||
# For CPU workers, schedule them with SPREAD strategy
|
||||
# to save as much resource as poosible on nodes for GPU workers.
|
||||
for _idx in range(sch.count):
|
||||
worker = RayWorker.options(
|
||||
name=f"{worker_type}/{_idx + i}",
|
||||
name=f"{worker_type}/{_idx}",
|
||||
num_cpus=sch.scheduling.cpu,
|
||||
num_gpus=sch.scheduling.gpu,
|
||||
memory=sch.scheduling.mem * 1024**2,
|
||||
scheduling_strategy="SPREAD",
|
||||
).remote(
|
||||
args=exp_cfg,
|
||||
worker_type=worker_type,
|
||||
|
@ -222,7 +272,6 @@ def _run_experiment(exp_cfg, expr_name, trial_name):
|
|||
kv_store_name=ray_kv_store_name,
|
||||
)
|
||||
workers.append(worker)
|
||||
|
||||
all_workers[worker_type] = workers
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue