Skip to content

slurm_scheduler, dir_workspace: add isolated workspaces for Slurm #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ papermill
ipykernel
nbsphinx
jupytext
ipython_genutils
12 changes: 12 additions & 0 deletions docs/source/workspace.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ torchx.workspace
.. autoclass:: Workspace
:members:

.. autofunction:: walk_workspace

torchx.workspace.docker_workspace
#######################################

Expand All @@ -19,3 +21,13 @@ torchx.workspace.docker_workspace
.. autoclass:: DockerWorkspace
:members:
:private-members: _update_app_images, _push_images

torchx.workspace.dir_workspace
#######################################


.. automodule:: torchx.workspace.dir_workspace
.. currentmodule:: torchx.workspace.dir_workspace

.. autoclass:: DirWorkspace
:members:
6 changes: 5 additions & 1 deletion scripts/slurmtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ DIR="$BASE_DIR/project"
mkdir "$DIR"
cd "$DIR"

JOB_DIR="$BASE_DIR/job"

# shellcheck disable=SC1091
source /opt/slurm/etc/slurm.sh
sbatch --version
Expand All @@ -32,10 +34,12 @@ partition=compute
time=10
comment=hello
nomem=true
job_dir=$JOB_DIR
EOT

cat <<EOT > main.py
print("hello world!")
import sys
print("hello world!", file=sys.stderr)
EOT

APP_ID="$(torchx run --wait --log --scheduler slurm dist.ddp -j 2x1 --script main.py)"
Expand Down
2 changes: 1 addition & 1 deletion torchx/runner/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def dryrun(
logger.info(
f"Building workspace: {workspace} for role[0]: {role.name}, image: {old_img}"
)
sched.build_workspace_and_update_role(role, workspace)
sched.build_workspace_and_update_role(role, workspace, cfg)
logger.info("Done building workspace")
if old_img != role.image:
logger.info(f"New image: {role.image} built from workspace")
Expand Down
5 changes: 4 additions & 1 deletion torchx/runner/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ def _cancel_existing(self, app_id: str) -> None:
pass

def build_workspace_and_update_role(
self, role: Role, workspace: str
self,
role: Role,
workspace: str,
cfg: Mapping[str, CfgVal],
) -> None:
if self.build_new_img:
role.image = f"{role.image}_new"
Expand Down
2 changes: 1 addition & 1 deletion torchx/schedulers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def submit(
sched = self
assert isinstance(sched, Workspace)
role = app.roles[0]
sched.build_workspace_and_update_role(role, workspace)
sched.build_workspace_and_update_role(role, workspace, cfg)
dryrun_info = self.submit_dryrun(app, cfg)
return self.schedule(dryrun_info)

Expand Down
73 changes: 62 additions & 11 deletions torchx/schedulers/slurm_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datetime import datetime
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple

import torchx
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream
from torchx.schedulers.local_scheduler import LogIterator
from torchx.specs import (
Expand All @@ -34,7 +35,9 @@
macros,
runopts,
)
from torchx.workspace.dir_workspace import DirWorkspace

SLURM_JOB_DIRS = ".torchxslurmjobdirs"

SLURM_STATES: Mapping[str, AppState] = {
"BOOT_FAIL": AppState.FAILED,
Expand Down Expand Up @@ -166,6 +169,7 @@ class SlurmBatchRequest:

cmd: List[str]
replicas: Dict[str, SlurmReplicaRequest]
job_dir: Optional[str]

def materialize(self) -> str:
"""
Expand All @@ -186,7 +190,12 @@ def materialize(self) -> str:
sbatch_opts = "#SBATCH hetjob\n".join(
f"#SBATCH {group}\n" for group in sbatch_groups
)
cmd = " ".join([shlex.quote(arg) for arg in self.cmd])
script = f"""#!/bin/bash
#
# Generated by TorchX {torchx.__version__}
# Run with: {cmd}
#
{sbatch_opts}
# exit on error
set -e
Expand All @@ -207,7 +216,7 @@ def __repr__(self) -> str:
{self.materialize()}"""


class SlurmScheduler(Scheduler):
class SlurmScheduler(Scheduler, DirWorkspace):
"""
SlurmScheduler is a TorchX scheduling interface to slurm. TorchX expects
that slurm CLI tools are locally installed and job accounting is enabled.
Expand Down Expand Up @@ -261,11 +270,8 @@ class SlurmScheduler(Scheduler):
Partial support. SlurmScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: |
Partial support. Typical Slurm usage is from a shared NFS mount
so code will automatically be updated on the workers.
SlurmScheduler does not support programmatic patching via
WorkspaceScheduler.

If ``job_dir`` is specified the DirWorkspace will create a new
isolated directory with a snapshot of the workspace.
"""

def __init__(self, session_name: str) -> None:
Expand All @@ -283,7 +289,9 @@ def run_opts(self) -> runopts:
"time",
type_=str,
default=None,
help="The maximum time the job is allowed to run for.",
help='The maximum time the job is allowed to run for. Formats: \
"minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", \
"days-hours:minutes" or "days-hours:minutes:seconds"',
)
opts.add(
"nomem",
Expand Down Expand Up @@ -311,25 +319,43 @@ def run_opts(self) -> runopts:
type_=str,
help="What events to mail users on.",
)
opts.add(
"job_dir",
type_=str,
help="""The directory to place the job code and outputs. The
directory must not exist and will be created. To enable log
iteration, jobs will be tracked in ``.torchxslurmjobdirs``.
""",
)
return opts

def schedule(self, dryrun_info: AppDryRunInfo[SlurmBatchRequest]) -> str:
req = dryrun_info.request
job_dir = req.job_dir
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(job_dir or tmpdir, "torchx-sbatch.sh")
if job_dir is not None:
req.cmd += [f"--chdir={job_dir}"]
req.cmd += [path]
script = req.materialize()
path = os.path.join(tmpdir, "job.sh")

with open(path, "w") as f:
f.write(script)

cmd = req.cmd + [path]
p = subprocess.run(req.cmd, stdout=subprocess.PIPE, check=True)
job_id = p.stdout.decode("utf-8").strip()

if job_dir is not None:
_save_job_dir(job_id, job_dir)

p = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
return p.stdout.decode("utf-8").strip()
return job_id

def _submit_dryrun(
self, app: AppDef, cfg: Mapping[str, CfgVal]
) -> AppDryRunInfo[SlurmBatchRequest]:
job_dir = cfg.get("job_dir")
assert job_dir is None or isinstance(job_dir, str), "job_dir must be str"

replicas = {}
for role in app.roles:
for replica_id in range(role.num_replicas):
Expand All @@ -351,6 +377,7 @@ def _submit_dryrun(
req = SlurmBatchRequest(
cmd=cmd,
replicas=replicas,
job_dir=job_dir,
)

return AppDryRunInfo(req, repr)
Expand Down Expand Up @@ -443,6 +470,9 @@ def log_iter(
)

log_file = f"slurm-{app_id}-{role_name}-{k}.{extension}"
job_dirs = _get_job_dirs()
if app_id in job_dirs:
log_file = os.path.join(job_dirs[app_id], log_file)

return LogIterator(
app_id, regex or ".*", log_file, self, should_tail=should_tail
Expand All @@ -453,3 +483,24 @@ def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
return SlurmScheduler(
session_name=session_name,
)


def _save_job_dir(job_id: str, job_dir: str) -> None:
with open(SLURM_JOB_DIRS, "at") as f:
f.write(f"{job_id} = {job_dir}\n")


def _get_job_dirs() -> Mapping[str, str]:
try:
with open(SLURM_JOB_DIRS, "rt") as f:
lines = f.readlines()
except FileNotFoundError:
return {}

out = {}
for line in lines:
first, _, second = line.partition("=")
if not first or not second:
continue
out[first.strip()] = second.strip()
return out
4 changes: 3 additions & 1 deletion torchx/schedulers/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def run_opts(self) -> runopts:
def resolve_resource(self, resource: Union[str, Resource]) -> Resource:
return NULL_RESOURCE

def build_workspace_and_update_role(self, role: Role, workspace: str) -> None:
def build_workspace_and_update_role(
self, role: Role, workspace: str, cfg: Mapping[str, CfgVal]
) -> None:
role.image = workspace

def test_invalid_run_cfg(self) -> None:
Expand Down
107 changes: 73 additions & 34 deletions torchx/schedulers/test/slurm_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
from typing import Generator
from unittest.mock import MagicMock, call, patch

import torchx
from torchx import specs
from torchx.schedulers.api import DescribeAppResponse, Stream
from torchx.schedulers.slurm_scheduler import (
SlurmBatchRequest,
SlurmReplicaRequest,
SlurmScheduler,
create_scheduler,
_save_job_dir,
_get_job_dirs,
)


Expand Down Expand Up @@ -179,7 +182,11 @@ def test_dryrun_multi_role(self) -> None:
script = req.materialize()
self.assertEqual(
script,
"""#!/bin/bash
f"""#!/bin/bash
#
# Generated by TorchX {torchx.__version__}
# Run with: sbatch --parsable
#
#SBATCH --job-name=a-0 --ntasks-per-node=1
#SBATCH hetjob
#SBATCH --job-name=a-1 --ntasks-per-node=1
Expand Down Expand Up @@ -348,44 +355,49 @@ def test_describe_running(self, run: MagicMock) -> None:
def test_log_iter(self, run: MagicMock) -> None:
scheduler = create_scheduler("foo")

with tmp_cwd():
with open("slurm-54-echo-1.out", "wt") as f:
f.write("hello\nworld\n")

logs = list(
scheduler.log_iter(
"54",
"echo",
1,
streams=Stream.STDOUT,
since=datetime.datetime.now(),
for job_dir in ["", "dir"]:
with tmp_cwd():
if job_dir:
os.mkdir(job_dir)
_save_job_dir("54", job_dir)

with open(os.path.join(job_dir, "slurm-54-echo-1.out"), "wt") as f:
f.write("hello\nworld\n")

logs = list(
scheduler.log_iter(
"54",
"echo",
1,
streams=Stream.STDOUT,
since=datetime.datetime.now(),
)
)
)
self.assertEqual(logs, ["hello", "world"])

with open("slurm-54-echo-1.err", "wt") as f:
f.write("foo\nbar\n")

logs = list(
scheduler.log_iter(
"54",
"echo",
1,
streams=Stream.STDERR,
self.assertEqual(logs, ["hello", "world"])

with open(os.path.join(job_dir, "slurm-54-echo-1.err"), "wt") as f:
f.write("foo\nbar\n")

logs = list(
scheduler.log_iter(
"54",
"echo",
1,
streams=Stream.STDERR,
)
)
)

self.assertEqual(logs, ["foo", "bar"])
self.assertEqual(logs, ["foo", "bar"])

# no stream specified should default to STDERR
logs = list(
scheduler.log_iter(
"54",
"echo",
1,
# no stream specified should default to STDERR
logs = list(
scheduler.log_iter(
"54",
"echo",
1,
)
)
)
self.assertEqual(logs, ["foo", "bar"])
self.assertEqual(logs, ["foo", "bar"])

with self.assertRaises(ValueError):
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)
Expand Down Expand Up @@ -422,3 +434,30 @@ def test_dryrun_mail(self) -> None:
"--mail-type=END",
info.request.cmd,
)

@patch("subprocess.run")
def test_run_workspace_job_dir(self, run: MagicMock) -> None:
with tmp_cwd():
run.return_value.stdout = b"1234"
scheduler = create_scheduler("foo")
scheduler.submit(
simple_app(),
cfg={
"job_dir": "dir",
},
workspace=".",
)
self.assertIn(("1234", "dir"), _get_job_dirs().items())

self.assertEqual(run.call_count, 1)
args, kwargs = run.call_args
(args,) = args
self.assertEqual(
args,
[
"sbatch",
"--parsable",
"--chdir=dir",
"dir/torchx-sbatch.sh",
],
)
Loading