Skip to content

Commit cc1ae1a

Browse files
committed
slurm_scheduler, dir_workspace: add isolated workspaces for Slurm
1 parent f5e5765 commit cc1ae1a

File tree

11 files changed

+282
-49
lines changed

11 files changed

+282
-49
lines changed

docs/source/workspace.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ torchx.workspace
99
.. autoclass:: Workspace
1010
:members:
1111

12+
.. autofunction:: walk_workspace
13+
1214
torchx.workspace.docker_workspace
1315
#######################################
1416

@@ -19,3 +21,13 @@ torchx.workspace.docker_workspace
1921
.. autoclass:: DockerWorkspace
2022
:members:
2123
:private-members: _update_app_images, _push_images
24+
25+
torchx.workspace.dir_workspace
26+
#######################################
27+
28+
29+
.. automodule:: torchx.workspace.dir_workspace
30+
.. currentmodule:: torchx.workspace.dir_workspace
31+
32+
.. autoclass:: DirWorkspace
33+
:members:

scripts/slurmtest.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ DIR="$BASE_DIR/project"
1515
mkdir "$DIR"
1616
cd "$DIR"
1717

18+
JOB_DIR="$BASE_DIR/job"
19+
1820
# shellcheck disable=SC1091
1921
source /opt/slurm/etc/slurm.sh
2022
sbatch --version
@@ -32,10 +34,12 @@ partition=compute
3234
time=10
3335
comment=hello
3436
nomem=true
37+
job_dir=$JOB_DIR
3538
EOT
3639

3740
cat <<EOT > main.py
38-
print("hello world!")
41+
import sys
42+
print("hello world!", file=sys.stderr)
3943
EOT
4044

4145
APP_ID="$(torchx run --wait --log --scheduler slurm dist.ddp -j 2x1 --script main.py)"

torchx/runner/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ def dryrun(
283283
logger.info(
284284
f"Building workspace: {workspace} for role[0]: {role.name}, image: {old_img}"
285285
)
286-
sched.build_workspace_and_update_role(role, workspace)
286+
sched.build_workspace_and_update_role(role, workspace, cfg)
287287
logger.info("Done building workspace")
288288
if old_img != role.image:
289289
logger.info(f"New image: {role.image} built from workspace")

torchx/schedulers/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def submit(
109109
sched = self
110110
assert isinstance(sched, Workspace)
111111
role = app.roles[0]
112-
sched.build_workspace_and_update_role(role, workspace)
112+
sched.build_workspace_and_update_role(role, workspace, cfg)
113113
dryrun_info = self.submit_dryrun(app, cfg)
114114
return self.schedule(dryrun_info)
115115

torchx/schedulers/slurm_scheduler.py

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
macros,
3535
runopts,
3636
)
37+
from torchx.workspace.dir_workspace import DirWorkspace
3738

39+
SLURM_JOB_DIRS = ".torchxslurmjobdirs"
3840

3941
SLURM_STATES: Mapping[str, AppState] = {
4042
"BOOT_FAIL": AppState.FAILED,
@@ -166,6 +168,7 @@ class SlurmBatchRequest:
166168

167169
cmd: List[str]
168170
replicas: Dict[str, SlurmReplicaRequest]
171+
job_dir: Optional[str]
169172

170173
def materialize(self) -> str:
171174
"""
@@ -200,7 +203,7 @@ def materialize(self) -> str:
200203
return script
201204

202205

203-
class SlurmScheduler(Scheduler):
206+
class SlurmScheduler(Scheduler, DirWorkspace):
204207
"""
205208
SlurmScheduler is a TorchX scheduling interface to slurm. TorchX expects
206209
that slurm CLI tools are locally installed and job accounting is enabled.
@@ -254,11 +257,8 @@ class SlurmScheduler(Scheduler):
254257
Partial support. SlurmScheduler will return job and replica
255258
status but does not provide the complete original AppSpec.
256259
workspaces: |
257-
Partial support. Typical Slurm usage is from a shared NFS mount
258-
so code will automatically be updated on the workers.
259-
SlurmScheduler does not support programmatic patching via
260-
WorkspaceScheduler.
261-
260+
If ``job_dir`` is specified the DirWorkspace will create a new
261+
isolated directory with a snapshot of the workspace.
262262
"""
263263

264264
def __init__(self, session_name: str) -> None:
@@ -276,7 +276,9 @@ def run_opts(self) -> runopts:
276276
"time",
277277
type_=str,
278278
default=None,
279-
help="The maximum time the job is allowed to run for.",
279+
help='The maximum time the job is allowed to run for. Formats: \
280+
"minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", \
281+
"days-hours:minutes" or "days-hours:minutes:seconds"',
280282
)
281283
opts.add(
282284
"nomem",
@@ -304,25 +306,45 @@ def run_opts(self) -> runopts:
304306
type_=str,
305307
help="What events to mail users on.",
306308
)
309+
opts.add(
310+
"job_dir",
311+
type_=str,
312+
help="""The directory to place the job code and outputs. The
313+
directory must not exist and will be created. To enable log
314+
iteration, jobs will be tracked in ``.torchxslurmjobdirs``.
315+
""",
316+
)
307317
return opts
308318

309319
def schedule(self, dryrun_info: AppDryRunInfo[SlurmBatchRequest]) -> str:
310320
req = dryrun_info.request
321+
job_dir = req.job_dir
311322
with tempfile.TemporaryDirectory() as tmpdir:
312323
script = req.materialize()
313-
path = os.path.join(tmpdir, "job.sh")
324+
path = os.path.join(job_dir or tmpdir, "torchx-sbatch.sh")
314325

315326
with open(path, "w") as f:
316327
f.write(script)
317328

318-
cmd = req.cmd + [path]
329+
cmd = req.cmd
330+
if job_dir is not None:
331+
cmd += [f"--chdir={job_dir}"]
332+
cmd += [path]
319333

320334
p = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
321-
return p.stdout.decode("utf-8").strip()
335+
job_id = p.stdout.decode("utf-8").strip()
336+
337+
if job_dir is not None:
338+
_save_job_dir(job_id, job_dir)
339+
340+
return job_id
322341

323342
def _submit_dryrun(
324343
self, app: AppDef, cfg: Mapping[str, CfgVal]
325344
) -> AppDryRunInfo[SlurmBatchRequest]:
345+
job_dir = cfg.get("job_dir")
346+
assert job_dir is None or isinstance(job_dir, str), "job_dir must be str"
347+
326348
replicas = {}
327349
for role in app.roles:
328350
for replica_id in range(role.num_replicas):
@@ -344,6 +366,7 @@ def _submit_dryrun(
344366
req = SlurmBatchRequest(
345367
cmd=cmd,
346368
replicas=replicas,
369+
job_dir=job_dir,
347370
)
348371
return AppDryRunInfo(req, repr)
349372

@@ -435,6 +458,9 @@ def log_iter(
435458
)
436459

437460
log_file = f"slurm-{app_id}-{role_name}-{k}.{extension}"
461+
job_dirs = _get_job_dirs()
462+
if app_id in job_dirs:
463+
log_file = os.path.join(job_dirs[app_id], log_file)
438464

439465
return LogIterator(
440466
app_id, regex or ".*", log_file, self, should_tail=should_tail
@@ -445,3 +471,19 @@ def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
445471
return SlurmScheduler(
446472
session_name=session_name,
447473
)
474+
475+
def _save_job_dir(job_id: str, job_dir: str) -> None:
476+
with open(SLURM_JOB_DIRS, "at") as f:
477+
f.write(f"{job_id} = {job_dir}\n")
478+
479+
def _get_job_dirs() -> Mapping[str, str]:
480+
with open(SLURM_JOB_DIRS, "rt") as f:
481+
lines = f.readlines()
482+
out = {}
483+
for line in lines:
484+
first, _, second = line.partition("=")
485+
if not first or not second:
486+
continue
487+
out[first.strip] = second.strip
488+
return out
489+

torchx/schedulers/test/api_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ def run_opts(self) -> runopts:
6969
def resolve_resource(self, resource: Union[str, Resource]) -> Resource:
7070
return NULL_RESOURCE
7171

72-
def build_workspace_and_update_role(self, role: Role, workspace: str) -> None:
72+
def build_workspace_and_update_role(
73+
self, role: Role, workspace: str, cfg: Mapping[str, CfgVal]
74+
) -> None:
7375
role.image = workspace
7476

7577
def test_invalid_run_cfg(self) -> None:

torchx/workspace/api.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,16 @@
55
# LICENSE file in the root directory of this source tree.
66

77
import abc
8+
import posixpath
9+
import fnmatch
10+
from typing import TYPE_CHECKING, Iterable, Mapping, Tuple
811

9-
from torchx.specs import Role
12+
from torchx.specs import Role, CfgVal
13+
14+
if TYPE_CHECKING:
15+
from fsspec import AbstractFileSystem
16+
17+
TORCHX_IGNORE = ".torchxignore"
1018

1119

1220
class Workspace(abc.ABC):
@@ -25,7 +33,9 @@ class Workspace(abc.ABC):
2533
"""
2634

2735
@abc.abstractmethod
28-
def build_workspace_and_update_role(self, role: Role, workspace: str) -> None:
36+
def build_workspace_and_update_role(
37+
self, role: Role, workspace: str, cfg: Mapping[str, CfgVal]
38+
) -> None:
2939
"""
3040
Builds the specified ``workspace`` with respect to ``img``
3141
and updates the ``role`` to reflect the built workspace artifacts.
@@ -36,3 +46,53 @@ def build_workspace_and_update_role(self, role: Role, workspace: str) -> None:
3646
Note: this method mutates the passed ``role``.
3747
"""
3848
...
49+
50+
51+
def _ignore(s: str, patterns: Iterable[str]) -> bool:
52+
match = False
53+
for pattern in patterns:
54+
if pattern.startswith("!") and fnmatch.fnmatch(s, pattern[1:]):
55+
match = False
56+
elif fnmatch.fnmatch(s, pattern):
57+
match = True
58+
return match
59+
60+
61+
def walk_workspace(
62+
fs: "AbstractFileSystem", path: str, ignore_name: str = TORCHX_IGNORE,
63+
) -> Iterable[Tuple[str, Iterable[str], Mapping[str, Mapping[str, object]]]]:
64+
"""
65+
walk_workspace walks the filesystem path and applies the ignore rules
66+
specified via ``ignore_name``.
67+
This follows the rules for ``.dockerignore``.
68+
https://docs.docker.com/engine/reference/builder/#dockerignore-file
69+
"""
70+
ignore_patterns = []
71+
ignore_path = posixpath.join(path, ignore_name)
72+
if fs.exists(ignore_path):
73+
with fs.open(ignore_path, "rt") as f:
74+
lines = f.readlines()
75+
for line in lines:
76+
line, _, _ = line.partition("#")
77+
line = line.strip()
78+
if len(line) == 0 or line == ".":
79+
continue
80+
ignore_patterns.append(line)
81+
82+
for dir, dirs, files in fs.walk(path, detail=True):
83+
assert isinstance(dir, str), "path must be str"
84+
relpath = posixpath.relpath(dir, path)
85+
if _ignore(relpath, ignore_patterns):
86+
continue
87+
dirs = [
88+
d for d in dirs if not _ignore(posixpath.join(relpath, d), ignore_patterns)
89+
]
90+
files = {
91+
file: info
92+
for file, info in files.items()
93+
if not _ignore(
94+
posixpath.join(relpath, file) if relpath != "." else file,
95+
ignore_patterns,
96+
)
97+
}
98+
yield dir, dirs, files

torchx/workspace/dir_workspace.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from typing import Mapping
2+
import os
3+
import posixpath
4+
import shutil
5+
6+
from torchx.workspace.api import Workspace, walk_workspace
7+
from torchx.specs import Role, CfgVal
8+
9+
import fsspec
10+
11+
class DirWorkspace(Workspace):
12+
def build_workspace_and_update_role(self, role: Role, workspace: str, cfg: Mapping[str, CfgVal]) -> None:
13+
"""
14+
Creates a new directory specified by ``job_dir`` from the workspace. Role
15+
image fields will be set to the ``job_dir``.
16+
17+
Any files listed in the ``.torchxignore`` folder will be skipped.
18+
"""
19+
job_dir = cfg.get("job_dir")
20+
if job_dir is None:
21+
return
22+
assert isinstance(job_dir, str), "job_dir must be str"
23+
24+
os.mkdir(job_dir)
25+
_copy_to_dir(workspace, job_dir)
26+
role.image = job_dir
27+
28+
29+
def _copy_to_dir(workspace: str, target: str) -> None:
30+
fs, path = fsspec.core.url_to_fs(workspace)
31+
assert isinstance(path, str), "path must be str"
32+
33+
for dir, dirs, files in walk_workspace(fs, path):
34+
assert isinstance(dir, str), "path must be str"
35+
relpath = posixpath.relpath(dir, path)
36+
for file, info in files.items():
37+
filepath = posixpath.join(
38+
target,
39+
posixpath.join(relpath, file) if relpath != "." else file,
40+
)
41+
with fs.open(info["name"], "rb") as src, fsspec.open(filepath, "wb") as dst:
42+
shutil.copyfileobj(src, dst)

0 commit comments

Comments
 (0)