Skip to content

Commit 9b60e81

Browse files
committed
slurm_scheduler, dir_workspace: add isolated workspaces for Slurm
1 parent f5e5765 commit 9b60e81

File tree

14 files changed

+382
-85
lines changed

14 files changed

+382
-85
lines changed

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ papermill
77
ipykernel
88
nbsphinx
99
jupytext
10+
ipython_genutils

docs/source/workspace.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ torchx.workspace
99
.. autoclass:: Workspace
1010
:members:
1111

12+
.. autofunction:: walk_workspace
13+
1214
torchx.workspace.docker_workspace
1315
#######################################
1416

@@ -19,3 +21,13 @@ torchx.workspace.docker_workspace
1921
.. autoclass:: DockerWorkspace
2022
:members:
2123
:private-members: _update_app_images, _push_images
24+
25+
torchx.workspace.dir_workspace
26+
#######################################
27+
28+
29+
.. automodule:: torchx.workspace.dir_workspace
30+
.. currentmodule:: torchx.workspace.dir_workspace
31+
32+
.. autoclass:: DirWorkspace
33+
:members:

scripts/slurmtest.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ DIR="$BASE_DIR/project"
1515
mkdir "$DIR"
1616
cd "$DIR"
1717

18+
JOB_DIR="$BASE_DIR/job"
19+
1820
# shellcheck disable=SC1091
1921
source /opt/slurm/etc/slurm.sh
2022
sbatch --version
@@ -32,10 +34,12 @@ partition=compute
3234
time=10
3335
comment=hello
3436
nomem=true
37+
job_dir=$JOB_DIR
3538
EOT
3639

3740
cat <<EOT > main.py
38-
print("hello world!")
41+
import sys
42+
print("hello world!", file=sys.stderr)
3943
EOT
4044

4145
APP_ID="$(torchx run --wait --log --scheduler slurm dist.ddp -j 2x1 --script main.py)"

torchx/runner/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ def dryrun(
283283
logger.info(
284284
f"Building workspace: {workspace} for role[0]: {role.name}, image: {old_img}"
285285
)
286-
sched.build_workspace_and_update_role(role, workspace)
286+
sched.build_workspace_and_update_role(role, workspace, cfg)
287287
logger.info("Done building workspace")
288288
if old_img != role.image:
289289
logger.info(f"New image: {role.image} built from workspace")

torchx/runner/test/api_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ def _cancel_existing(self, app_id: str) -> None:
154154
pass
155155

156156
def build_workspace_and_update_role(
157-
self, role: Role, workspace: str
157+
self,
158+
role: Role,
159+
workspace: str,
160+
cfg: Mapping[str, CfgVal],
158161
) -> None:
159162
if self.build_new_img:
160163
role.image = f"{role.image}_new"

torchx/schedulers/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def submit(
109109
sched = self
110110
assert isinstance(sched, Workspace)
111111
role = app.roles[0]
112-
sched.build_workspace_and_update_role(role, workspace)
112+
sched.build_workspace_and_update_role(role, workspace, cfg)
113113
dryrun_info = self.submit_dryrun(app, cfg)
114114
return self.schedule(dryrun_info)
115115

torchx/schedulers/slurm_scheduler.py

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
macros,
3535
runopts,
3636
)
37+
from torchx.workspace.dir_workspace import DirWorkspace
3738

39+
SLURM_JOB_DIRS = ".torchxslurmjobdirs"
3840

3941
SLURM_STATES: Mapping[str, AppState] = {
4042
"BOOT_FAIL": AppState.FAILED,
@@ -166,6 +168,7 @@ class SlurmBatchRequest:
166168

167169
cmd: List[str]
168170
replicas: Dict[str, SlurmReplicaRequest]
171+
job_dir: Optional[str]
169172

170173
def materialize(self) -> str:
171174
"""
@@ -200,7 +203,7 @@ def materialize(self) -> str:
200203
return script
201204

202205

203-
class SlurmScheduler(Scheduler):
206+
class SlurmScheduler(Scheduler, DirWorkspace):
204207
"""
205208
SlurmScheduler is a TorchX scheduling interface to slurm. TorchX expects
206209
that slurm CLI tools are locally installed and job accounting is enabled.
@@ -254,11 +257,8 @@ class SlurmScheduler(Scheduler):
254257
Partial support. SlurmScheduler will return job and replica
255258
status but does not provide the complete original AppSpec.
256259
workspaces: |
257-
Partial support. Typical Slurm usage is from a shared NFS mount
258-
so code will automatically be updated on the workers.
259-
SlurmScheduler does not support programmatic patching via
260-
WorkspaceScheduler.
261-
260+
If ``job_dir`` is specified the DirWorkspace will create a new
261+
isolated directory with a snapshot of the workspace.
262262
"""
263263

264264
def __init__(self, session_name: str) -> None:
@@ -276,7 +276,9 @@ def run_opts(self) -> runopts:
276276
"time",
277277
type_=str,
278278
default=None,
279-
help="The maximum time the job is allowed to run for.",
279+
help='The maximum time the job is allowed to run for. Formats: \
280+
"minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", \
281+
"days-hours:minutes" or "days-hours:minutes:seconds"',
280282
)
281283
opts.add(
282284
"nomem",
@@ -304,25 +306,45 @@ def run_opts(self) -> runopts:
304306
type_=str,
305307
help="What events to mail users on.",
306308
)
309+
opts.add(
310+
"job_dir",
311+
type_=str,
312+
help="""The directory to place the job code and outputs. The
313+
directory must not exist and will be created. To enable log
314+
iteration, jobs will be tracked in ``.torchxslurmjobdirs``.
315+
""",
316+
)
307317
return opts
308318

309319
def schedule(self, dryrun_info: AppDryRunInfo[SlurmBatchRequest]) -> str:
310320
req = dryrun_info.request
321+
job_dir = req.job_dir
311322
with tempfile.TemporaryDirectory() as tmpdir:
312323
script = req.materialize()
313-
path = os.path.join(tmpdir, "job.sh")
324+
path = os.path.join(job_dir or tmpdir, "torchx-sbatch.sh")
314325

315326
with open(path, "w") as f:
316327
f.write(script)
317328

318-
cmd = req.cmd + [path]
329+
cmd = req.cmd
330+
if job_dir is not None:
331+
cmd += [f"--chdir={job_dir}"]
332+
cmd += [path]
319333

320334
p = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
321-
return p.stdout.decode("utf-8").strip()
335+
job_id = p.stdout.decode("utf-8").strip()
336+
337+
if job_dir is not None:
338+
_save_job_dir(job_id, job_dir)
339+
340+
return job_id
322341

323342
def _submit_dryrun(
324343
self, app: AppDef, cfg: Mapping[str, CfgVal]
325344
) -> AppDryRunInfo[SlurmBatchRequest]:
345+
job_dir = cfg.get("job_dir")
346+
assert job_dir is None or isinstance(job_dir, str), "job_dir must be str"
347+
326348
replicas = {}
327349
for role in app.roles:
328350
for replica_id in range(role.num_replicas):
@@ -344,6 +366,7 @@ def _submit_dryrun(
344366
req = SlurmBatchRequest(
345367
cmd=cmd,
346368
replicas=replicas,
369+
job_dir=job_dir,
347370
)
348371
return AppDryRunInfo(req, repr)
349372

@@ -435,6 +458,9 @@ def log_iter(
435458
)
436459

437460
log_file = f"slurm-{app_id}-{role_name}-{k}.{extension}"
461+
job_dirs = _get_job_dirs()
462+
if app_id in job_dirs:
463+
log_file = os.path.join(job_dirs[app_id], log_file)
438464

439465
return LogIterator(
440466
app_id, regex or ".*", log_file, self, should_tail=should_tail
@@ -445,3 +471,24 @@ def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
445471
return SlurmScheduler(
446472
session_name=session_name,
447473
)
474+
475+
476+
def _save_job_dir(job_id: str, job_dir: str) -> None:
477+
with open(SLURM_JOB_DIRS, "at") as f:
478+
f.write(f"{job_id} = {job_dir}\n")
479+
480+
481+
def _get_job_dirs() -> Mapping[str, str]:
482+
try:
483+
with open(SLURM_JOB_DIRS, "rt") as f:
484+
lines = f.readlines()
485+
except FileNotFoundError:
486+
return {}
487+
488+
out = {}
489+
for line in lines:
490+
first, _, second = line.partition("=")
491+
if not first or not second:
492+
continue
493+
out[first.strip()] = second.strip()
494+
return out

torchx/schedulers/test/api_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ def run_opts(self) -> runopts:
6969
def resolve_resource(self, resource: Union[str, Resource]) -> Resource:
7070
return NULL_RESOURCE
7171

72-
def build_workspace_and_update_role(self, role: Role, workspace: str) -> None:
72+
def build_workspace_and_update_role(
73+
self, role: Role, workspace: str, cfg: Mapping[str, CfgVal]
74+
) -> None:
7375
role.image = workspace
7476

7577
def test_invalid_run_cfg(self) -> None:

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
SlurmReplicaRequest,
2121
SlurmScheduler,
2222
create_scheduler,
23+
_save_job_dir,
24+
_get_job_dirs,
2325
)
2426

2527

@@ -348,44 +350,50 @@ def test_describe_running(self, run: MagicMock) -> None:
348350
def test_log_iter(self, run: MagicMock) -> None:
349351
scheduler = create_scheduler("foo")
350352

351-
with tmp_cwd():
352-
with open("slurm-54-echo-1.out", "wt") as f:
353-
f.write("hello\nworld\n")
354-
355-
logs = list(
356-
scheduler.log_iter(
357-
"54",
358-
"echo",
359-
1,
360-
streams=Stream.STDOUT,
361-
since=datetime.datetime.now(),
353+
for job_dir in ["", "dir"]:
354+
print("job_dir", job_dir)
355+
with tmp_cwd():
356+
if job_dir:
357+
os.mkdir(job_dir)
358+
_save_job_dir("54", job_dir)
359+
360+
with open(os.path.join(job_dir, "slurm-54-echo-1.out"), "wt") as f:
361+
f.write("hello\nworld\n")
362+
363+
logs = list(
364+
scheduler.log_iter(
365+
"54",
366+
"echo",
367+
1,
368+
streams=Stream.STDOUT,
369+
since=datetime.datetime.now(),
370+
)
362371
)
363-
)
364-
self.assertEqual(logs, ["hello", "world"])
365-
366-
with open("slurm-54-echo-1.err", "wt") as f:
367-
f.write("foo\nbar\n")
368-
369-
logs = list(
370-
scheduler.log_iter(
371-
"54",
372-
"echo",
373-
1,
374-
streams=Stream.STDERR,
372+
self.assertEqual(logs, ["hello", "world"])
373+
374+
with open(os.path.join(job_dir, "slurm-54-echo-1.err"), "wt") as f:
375+
f.write("foo\nbar\n")
376+
377+
logs = list(
378+
scheduler.log_iter(
379+
"54",
380+
"echo",
381+
1,
382+
streams=Stream.STDERR,
383+
)
375384
)
376-
)
377385

378-
self.assertEqual(logs, ["foo", "bar"])
386+
self.assertEqual(logs, ["foo", "bar"])
379387

380-
# no stream specified should default to STDERR
381-
logs = list(
382-
scheduler.log_iter(
383-
"54",
384-
"echo",
385-
1,
388+
# no stream specified should default to STDERR
389+
logs = list(
390+
scheduler.log_iter(
391+
"54",
392+
"echo",
393+
1,
394+
)
386395
)
387-
)
388-
self.assertEqual(logs, ["foo", "bar"])
396+
self.assertEqual(logs, ["foo", "bar"])
389397

390398
with self.assertRaises(ValueError):
391399
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)
@@ -422,3 +430,30 @@ def test_dryrun_mail(self) -> None:
422430
"--mail-type=END",
423431
info.request.cmd,
424432
)
433+
434+
@patch("subprocess.run")
435+
def test_run_workspace_job_dir(self, run: MagicMock) -> None:
436+
with tmp_cwd():
437+
run.return_value.stdout = b"1234"
438+
scheduler = create_scheduler("foo")
439+
scheduler.submit(
440+
simple_app(),
441+
cfg={
442+
"job_dir": "dir",
443+
},
444+
workspace=".",
445+
)
446+
self.assertIn(("1234", "dir"), _get_job_dirs().items())
447+
448+
self.assertEqual(run.call_count, 1)
449+
args, kwargs = run.call_args
450+
(args,) = args
451+
self.assertEqual(
452+
args,
453+
[
454+
"sbatch",
455+
"--parsable",
456+
"--chdir=dir",
457+
"dir/torchx-sbatch.sh",
458+
],
459+
)

0 commit comments

Comments
 (0)