Crane Library
Async client class for crane users.
Attributes:
Name | Type | Description |
---|---|---|
job |
JobCommandCollection |
job commands |
mini_cluster |
MiniClusterCommandCollection |
Mini cluster commands |
resource |
ResourceCommandCollection |
Resource commands |
user |
UserCommandCollection |
User commands |
superuser |
SuperuserCommandCollection |
Superuser commands |
ws |
WorkspaceCommandCollection |
Workspace commands |
__class__
inherited
¶
Metaclass for defining Abstract Base Classes (ABCs).
Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).
__instancecheck__(cls, instance)
special
¶
Override for isinstance(instance, cls).
Source code in crane/lib/sync/user/manager.py
def __instancecheck__(cls, instance):
"""Override for isinstance(instance, cls)."""
return _abc_instancecheck(cls, instance)
__new__(mcls, name, bases, namespace, **kwargs)
special
staticmethod
¶
Create and return a new object. See help(type) for accurate signature.
Source code in crane/lib/sync/user/manager.py
def __new__(mcls, name, bases, namespace, **kwargs):
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
_abc_init(cls)
return cls
__subclasscheck__(cls, subclass)
special
¶
Override for issubclass(subclass, cls).
Source code in crane/lib/sync/user/manager.py
def __subclasscheck__(cls, subclass):
"""Override for issubclass(subclass, cls)."""
return _abc_subclasscheck(cls, subclass)
register(cls, subclass)
¶
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
Source code in crane/lib/sync/user/manager.py
def register(cls, subclass):
"""Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
"""
return _abc_register(cls, subclass)
config_template
¶
Configuration for crane user client.
access_token: Any
property
writable
¶
Configuration access_token[typing.Optional[str]]
refresh_token: Any
property
writable
¶
Configuration refresh_token[typing.Optional[str]]
url: Any
property
writable
¶
Configuration url[
Meta
¶
Prefix.
save(self)
inherited
¶
Save configuration.
Atomic write via write & rename.
Source code in crane/lib/sync/user/manager.py
def save(self) -> None:
"""Save configuration.
Atomic write via write & rename.
"""
with atomic_write(self._file_path) as f:
json.dump(self._config, f)
to_dict(self)
inherited
¶
Return original configuration dictionary.
Source code in crane/lib/sync/user/manager.py
def to_dict(self) -> Mapping[str, Any]:
"""Return original configuration dictionary."""
defaults = getattr(self, "_defaults")
return dict(ChainMap(self._env_overrides, self._config, defaults))
__init__(self, session, config)
special
¶
Initialize.
Source code in crane/lib/sync/user/manager.py
def __init__(self, session: SyncHTTPClient, config: UserLibConfig) -> None:
"""Initialize."""
super().__init__(session, config)
# TODO: implement session that prefixes session
self.mini_cluster = MiniClusterCommandCollection(self)
self.resource = ResourceCommandCollection(self)
self.user = UserCommandCollection(self)
self.ws = WorkspaceCommandCollection(self)
self.job = JobCommandCollection(self.mini_cluster)
if self.config.access_token is not None:
headers = dict(Authorization=f"Bearer {self.config.access_token}")
self.session.headers.update(headers)
close(self)
inherited
¶
Close client.
Source code in crane/lib/sync/user/manager.py
def close(self) -> None:
"""Close client."""
self.session.close()
is_healthy(self)
inherited
¶
Return true if server responses with ping.
Source code in crane/lib/sync/user/manager.py
def is_healthy(self) -> bool:
"""Return true if server responses with ping."""
try:
self.ping()
return True
except (HTTPBadResponseError, HTTPError):
return False
ping(self)
¶
Ping... and pong.
Source code in crane/lib/sync/user/manager.py
def ping(self) -> None:
"""Ping... and pong."""
res = self.session.get(f"/gateway{C.Gateway.URL.PING}", timeout=1)
assert_response(res, 200)
Async client class for crane users.
Attributes:
Name | Type | Description |
---|---|---|
job |
JobCommandCollection |
job commands |
mini_cluster |
MiniClusterCommandCollection |
Mini cluster commands |
resource |
ResourceCommandCollection |
Resource commands |
user |
UserCommandCollection |
User commands |
superuser |
SuperuserCommandCollection |
Superuser commands |
ws |
WorkspaceCommandCollection |
Workspace commands |
__class__
inherited
¶
Metaclass for defining Abstract Base Classes (ABCs).
Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).
__instancecheck__(cls, instance)
special
¶
Override for isinstance(instance, cls).
Source code in crane/lib/aio/user/manager.py
def __instancecheck__(cls, instance):
"""Override for isinstance(instance, cls)."""
return _abc_instancecheck(cls, instance)
__new__(mcls, name, bases, namespace, **kwargs)
special
staticmethod
¶
Create and return a new object. See help(type) for accurate signature.
Source code in crane/lib/aio/user/manager.py
def __new__(mcls, name, bases, namespace, **kwargs):
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
_abc_init(cls)
return cls
__subclasscheck__(cls, subclass)
special
¶
Override for issubclass(subclass, cls).
Source code in crane/lib/aio/user/manager.py
def __subclasscheck__(cls, subclass):
"""Override for issubclass(subclass, cls)."""
return _abc_subclasscheck(cls, subclass)
register(cls, subclass)
¶
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
Source code in crane/lib/aio/user/manager.py
def register(cls, subclass):
"""Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
"""
return _abc_register(cls, subclass)
config_template
¶
Configuration for crane user client.
access_token: Any
property
writable
¶
Configuration access_token[typing.Optional[str]]
refresh_token: Any
property
writable
¶
Configuration refresh_token[typing.Optional[str]]
url: Any
property
writable
¶
Configuration url[
Meta
¶
Prefix.
save(self)
inherited
¶
Save configuration.
Atomic write via write & rename.
Source code in crane/lib/aio/user/manager.py
def save(self) -> None:
"""Save configuration.
Atomic write via write & rename.
"""
with atomic_write(self._file_path) as f:
json.dump(self._config, f)
to_dict(self)
inherited
¶
Return original configuration dictionary.
Source code in crane/lib/aio/user/manager.py
def to_dict(self) -> Mapping[str, Any]:
"""Return original configuration dictionary."""
defaults = getattr(self, "_defaults")
return dict(ChainMap(self._env_overrides, self._config, defaults))
__init__(self, session, config)
special
¶
Initialize.
Source code in crane/lib/aio/user/manager.py
def __init__(self, session: AsyncHTTPClient, config: UserLibConfig) -> None:
"""Initialize."""
super().__init__(session, config)
# TODO: implement session that prefixes session
self.mini_cluster = MiniClusterCommandCollection(self)
self.resource = ResourceCommandCollection(self)
self.user = UserCommandCollection(self)
self.ws = WorkspaceCommandCollection(self)
self.job = JobCommandCollection(self.mini_cluster)
if self.config.access_token is not None:
headers = dict(Authorization=f"Bearer {self.config.access_token}")
self.session.headers.update(headers)
close(self)
async
inherited
¶
Close client.
Source code in crane/lib/aio/user/manager.py
async def close(self) -> None:
"""Close client."""
await self.session.aclose()
is_healthy(self)
async
inherited
¶
Return true if server responses with ping.
Source code in crane/lib/aio/user/manager.py
async def is_healthy(self) -> bool:
"""Return true if server responses with ping."""
try:
await self.ping()
return True
except (HTTPBadResponseError, HTTPError):
return False
ping(self)
async
¶
Ping... and pong.
Source code in crane/lib/aio/user/manager.py
async def ping(self) -> None:
"""Ping... and pong."""
res = await self.session.get(f"/gateway{C.Gateway.URL.PING}", timeout=1)
assert_response(res, 200)
Job subcommands.
__init__(self, mini_cluster)
special
¶
Initialize.
Source code in crane/lib/aio/user/job.py
def __init__(self, mini_cluster: MiniClusterCommandCollection) -> None:
"""Initialize."""
self.mini_cluster = mini_cluster
add(self, *args, **kwargs)
async
¶
Add a new job to cluster.
Source code in crane/lib/aio/user/job.py
async def add(self, *args, **kwargs) -> MCCreateResponse:
"""Add a new job to cluster."""
# TODO: args will be added later. Removed now to avoid lint error
return await self.mini_cluster.add(*args, **kwargs)
delete(self, job_id)
async
¶
Remove a job by its id.
Source code in crane/lib/aio/user/job.py
async def delete(self, job_id: str) -> None:
"""Remove a job by its id."""
return await self.mini_cluster.delete(job_id)
filter(self, job_id_or_name, tags=None, state=None)
async
¶
Filter jobs by id, name or tags.
Source code in crane/lib/aio/user/job.py
async def filter(
self,
job_id_or_name: str,
tags: list[str] | None = None,
state: list[str] | None = None,
) -> list[str]:
"""Filter jobs by id, name or tags."""
return await self.mini_cluster.filter(job_id_or_name, tags, state)
inspect(self, job_id)
async
¶
Inspect a job by its id.
Source code in crane/lib/aio/user/job.py
async def inspect(self, job_id: str) -> MCInspectResponse:
"""Inspect a job by its id."""
return await self.mini_cluster.inspect(job_id)
kill(self, job_id, force)
async
¶
Kill a job by its id.
Source code in crane/lib/aio/user/job.py
async def kill(self, job_id: str, force: bool) -> None:
"""Kill a job by its id."""
return await self.mini_cluster.kill(job_id, force)
log(self, job_id, follow=False, filter_=LogFilter(source=None, since=None))
¶
Iterate through a log.
Source code in crane/lib/aio/user/job.py
async def log(
self,
job_id: str,
follow: bool = False,
filter_: log_model.LogFilter = log_model.LogFilter(),
) -> AsyncIterator[log_model.Log]:
"""Iterate through a log."""
async for log in self.mini_cluster.log(job_id, follow, filter_):
yield log
Job subcommands.
add(self, image, resource, command=None, envs=None, mounts=None, ports=None, public_ports=None, rdma=False, overlay=False, shm_size='64m', name=None, tags=None, init=False, workspace_id=None)
async
¶
Add a new job to cluster.
Source code in crane/lib/aio/user/mini_cluster.py
async def add(
self,
image: str,
resource: str,
command: str | None = None,
envs: list[str] | None = None,
mounts: list[str] | None = None,
ports: list[int] | None = None,
public_ports: list[int] | None = None,
rdma: bool = False,
overlay: bool = False,
shm_size: str = "64m",
name: str | None = None,
tags: list[str] | None = None,
init: bool = False,
workspace_id: str | None = None,
) -> MCCreateResponse:
"""Add a new job to cluster."""
envs = envs or []
mounts = mounts or []
ports = ports or []
public_ports = public_ports or []
# build config
resource_spec = _build_resource(resource)
app_image = container.Image(name=image)
app_process = container.Process(cmd=command, envs=_build_envs(envs), init=init)
app_storage = container.Storage(mounts=_build_mounts(mounts))
app_network = container.Network(
rdma=rdma, ports=_build_ports(ports, public_ports)
)
app_host = container.Host(shm_size=_build_shm_size(shm_size))
app_config = container.Config(
image=app_image,
resource_spec=resource_spec.am_resource,
process=app_process,
storage=app_storage,
network=app_network,
host=app_host,
)
mini_cluster_setting = mini_cluster.MiniClusterSetting(overlay=overlay)
job_config = mini_cluster.Config(
app_manager=app_config,
resource_spec=resource_spec,
mini_cluster_setting=mini_cluster_setting,
)
job_request = MCCreateRequest(
config=job_config, name=name, workspace_id=workspace_id, tags=tags or []
)
res = await self._session.post(
f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", json=job_request.to_dict()
)
assert_response(res, 201)
job_info = MCCreateResponse.from_dict(res.json())
return job_info
delete(self, mc_id)
async
¶
Remove a mini-cluster by its id.
Source code in crane/lib/aio/user/mini_cluster.py
async def delete(self, mc_id: str) -> None:
"""Remove a mini-cluster by its id."""
url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=mc_id)}"
res = await self._session.delete(url)
assert_response(res)
filter(self, id_or_name, tags=None, state=None)
async
¶
Filter jobs by id, name or tags.
Source code in crane/lib/aio/user/mini_cluster.py
async def filter(
self,
id_or_name: str,
tags: list[str] | None = None,
state: list[str] | None = None,
) -> list[str]:
"""Filter jobs by id, name or tags."""
params = dict(id_or_name=id_or_name, tags=tags or [], state=state or [])
res = await self._session.get(
f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", params=params
)
assert_response(res)
job_list = res.json()
return job_list
inspect(self, job_id)
async
¶
Inspect a job by its id.
Source code in crane/lib/aio/user/mini_cluster.py
async def inspect(self, job_id: str) -> MCInspectResponse:
"""Inspect a job by its id."""
url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=job_id)}"
res = await self._session.get(url)
assert_response(res)
return MCInspectResponse.from_json(res.text)
kill(self, mc_id, force)
async
¶
Kill a mini-cluster by its id.
Source code in crane/lib/aio/user/mini_cluster.py
async def kill(self, mc_id: str, force: bool) -> None:
"""Kill a mini-cluster by its id."""
params = {"force": "true" if force else "false"}
url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_KILL.format(mc_id=mc_id)}"
res = await self._session.post(url, params=params)
assert_response(res)
log(self, mc_id, follow=False, filter_=LogFilter(source=None, since=None))
¶
Iterate through a log.
Source code in crane/lib/aio/user/mini_cluster.py
async def log(
self,
mc_id: str,
follow: bool = False,
filter_: log_model.LogFilter = log_model.LogFilter(),
) -> AsyncIterator[log_model.Log]:
"""Iterate through a log."""
params: dict[str, Any] = dict(follow=follow)
body = filter_.to_dict()
url = f"/gateway{C.Gateway.URL.JOB_LOG.format(mc_id=mc_id)}"
with self._session.stream("GET", url, params=params, json=body) as res:
assert_response(res)
async for line in res.aiter_lines():
# Filter keep-alive
if line:
yield log_model.Log.from_json(line)
Commands for resource.
cluster_resource(self)
async
¶
Query resource state.
Returns:
Type | Description |
---|---|
resource.PhysicalAllocationCluster |
resource.PhysicalAllocationCluster: cluster resource state |
Exceptions:
Type | Description |
---|---|
HTTPBadResponseError |
if status code is not expected |
HTTPConnectionError |
if connection fails |
Source code in crane/lib/aio/user/resource.py
async def cluster_resource(self) -> resource.PhysicalAllocationCluster:
"""Query resource state.
Returns:
resource.PhysicalAllocationCluster: cluster resource state
Raises:
HTTPBadResponseError: if status code is not expected
HTTPConnectionError: if connection fails
"""
res = await self._session.get(f"/gateway{C.Gateway.URL.CLUSTER_RESOURCE}")
assert_response(res)
return resource.PhysicalAllocationCluster.from_dict(res.json())
Commands for session management.
sign_in(self, browser=True)
async
¶
Sign in to crane.
TODO: add other authorization strategies for scripts¶
TODO: move console prints to cli level.¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
browser |
bool |
If true, opens the browser. Defaults to True. |
True |
Source code in crane/lib/aio/user/user.py
async def sign_in(self, browser: bool = True) -> None:
"""Sign in to crane.
# TODO: add other authorization strategies for scripts
# TODO: move console prints to cli level.
Args:
browser (bool): If true, opens the browser. Defaults to True.
"""
# initiate device login flow.
res = await self._session.get(f"/gateway{C.Gateway.URL.AUTH_DEVICE}")
assert_response(res)
flow_info = user.DeviceFlowInfo.from_json(res.text)
if browser:
input("Press ENTER to open login page in your browser.")
has_browser = webbrowser.open(flow_info.redirect_uri)
if not has_browser:
# TODO: need better exception class.
raise OSError("No web browser found.")
else:
print("Go to the link below in a web browser.", flush=True)
print(flow_info.redirect_uri, flush=True)
# wait until login is completed
console = Console()
body = user.TokenRequest(device_code=flow_info.device_code)
with console.status(
"[bold green]Checking for authorization results...", spinner="pong"
):
for sleep_duration in generate_sequence("fibonacci", init=2, upper=10):
await async_sleep(sleep_duration)
res = await self._session.post(
f"/gateway{C.Gateway.URL.AUTH_DEVICE_TOKEN}", json=body.to_dict()
)
assert_response(res, (200, 429))
if res.status_code == 200:
break
token = user.Token.from_json(res.text)
self._config.access_token = token.access_token
self._config.refresh_token = token.refresh_token
self._config.save()
sign_out(self)
async
¶
Sign out user.
Exceptions:
Type | Description |
---|---|
HTTPBadResponseError |
if status code is not expected |
ConnectError |
if connection fails |
Source code in crane/lib/aio/user/user.py
async def sign_out(self) -> None:
"""Sign out user.
Raises:
HTTPBadResponseError: if status code is not expected
ConnectError: if connection fails
"""
self._config.access_token = None
self._config.refresh_token = None
self._config.save()
whoami(self)
async
¶
Return public user info about this user.
Returns:
Type | Description |
---|---|
user.UserInfo |
user.UserInfo: user info |
Exceptions:
Type | Description |
---|---|
HTTPBadResponseError |
if status code is not expected |
ConnectError |
if connection fails |
Source code in crane/lib/aio/user/user.py
async def whoami(self) -> user.UserInfo:
"""Return public user info about this user.
Returns:
user.UserInfo: user info
Raises:
HTTPBadResponseError: if status code is not expected
ConnectError: if connection fails
"""
res = await self._session.get(f"/gateway{C.Gateway.URL.USER_DETAIL}")
assert_response(res)
return user.UserInfo.from_json(res.text)
Commands for workspaces.
add(self)
async
¶
Zip workspace and send the tarball to server.
Returns:
Type | Description |
---|---|
str |
context_id (str): id of newly created context |
Source code in crane/lib/aio/user/workspace.py
async def add(self) -> str:
"""Zip workspace and send the tarball to server.
Returns:
context_id (str): id of newly created context
"""
zip_file_path = zip_workspace(Path.cwd())
with open(zip_file_path, "rb") as f:
files = {"file": ("file", f, "application/x-tar")}
res = await self._session.post(
f"/workspace{C.WsServer.URL.WORKSPACE}", files=files
)
assert_response(res, 201)
context_id = res.json()
# TODO: remove zip_file_path
return context_id
init(self)
async
¶
Initialize crane workspace in git-like manner.
After init function, the directory will look like, / | .crane/ | config.yaml | .craneignore | {workspace_id}.tar
Returns:
Type | Description |
---|---|
bool |
bool: whether re-initialize |
Source code in crane/lib/aio/user/workspace.py
async def init(self) -> bool:
"""Initialize crane workspace in git-like manner.
After init function, the directory will look like,
/
|_ .crane/
|_ config.yaml
|_ .craneignore
|_ {workspace_id}.tar
Returns:
bool: whether re-initialize
"""
context_path = Path.cwd() / C.Workspace.CONTEXT_DIR
config_path = context_path / C.Workspace.CONFIG_PATH
reinit_flag = config_path.exists()
if not reinit_flag:
os.makedirs(context_path, exist_ok=True)
default_config = {"container_path": _default_container_path()}
with open(config_path, "w", encoding="utf-8") as f:
# TODO: implement more on config file
yaml.dump(default_config, f)
return reinit_flag
is_workspace_server_alive(self)
async
¶
Check if ws server is alive.
Source code in crane/lib/aio/user/workspace.py
async def is_workspace_server_alive(self) -> bool:
"""Check if ws server is alive."""
res = await self._session.get(f"/workspace{C.WsServer.URL.PING}", timeout=1)
return res.status_code == 200
Job subcommands.
__init__(self, mini_cluster)
special
¶
Initialize.
Source code in crane/lib/sync/user/job.py
def __init__(self, mini_cluster: MiniClusterCommandCollection) -> None:
"""Initialize."""
self.mini_cluster = mini_cluster
add(self, *args, **kwargs)
¶
Add a new job to cluster.
Source code in crane/lib/sync/user/job.py
def add(self, *args, **kwargs) -> MCCreateResponse:
"""Add a new job to cluster."""
# TODO: args will be added later. Removed now to avoid lint error
return self.mini_cluster.add(*args, **kwargs)
delete(self, job_id)
¶
Remove a job by its id.
Source code in crane/lib/sync/user/job.py
def delete(self, job_id: str) -> None:
"""Remove a job by its id."""
return self.mini_cluster.delete(job_id)
filter(self, job_id_or_name, tags=None, state=None)
¶
Filter jobs by id, name or tags.
Source code in crane/lib/sync/user/job.py
def filter(
self,
job_id_or_name: str,
tags: list[str] | None = None,
state: list[str] | None = None,
) -> list[str]:
"""Filter jobs by id, name or tags."""
return self.mini_cluster.filter(job_id_or_name, tags, state)
inspect(self, job_id)
¶
Inspect a job by its id.
Source code in crane/lib/sync/user/job.py
def inspect(self, job_id: str) -> MCInspectResponse:
"""Inspect a job by its id."""
return self.mini_cluster.inspect(job_id)
kill(self, job_id, force)
¶
Kill a job by its id.
Source code in crane/lib/sync/user/job.py
def kill(self, job_id: str, force: bool) -> None:
"""Kill a job by its id."""
return self.mini_cluster.kill(job_id, force)
log(self, job_id, follow=False, filter_=LogFilter(source=None, since=None))
¶
Iterate through a log.
Source code in crane/lib/sync/user/job.py
def log(
self,
job_id: str,
follow: bool = False,
filter_: log_model.LogFilter = log_model.LogFilter(),
) -> Iterator[log_model.Log]:
"""Iterate through a log."""
for log in self.mini_cluster.log(job_id, follow, filter_):
yield log
Job subcommands.
add(self, image, resource, command=None, envs=None, mounts=None, ports=None, public_ports=None, rdma=False, overlay=False, shm_size='64m', name=None, tags=None, init=False, workspace_id=None)
¶
Add a new job to cluster.
Source code in crane/lib/sync/user/mini_cluster.py
def add(
self,
image: str,
resource: str,
command: str | None = None,
envs: list[str] | None = None,
mounts: list[str] | None = None,
ports: list[int] | None = None,
public_ports: list[int] | None = None,
rdma: bool = False,
overlay: bool = False,
shm_size: str = "64m",
name: str | None = None,
tags: list[str] | None = None,
init: bool = False,
workspace_id: str | None = None,
) -> MCCreateResponse:
"""Add a new job to cluster."""
envs = envs or []
mounts = mounts or []
ports = ports or []
public_ports = public_ports or []
# build config
resource_spec = _build_resource(resource)
app_image = container.Image(name=image)
app_process = container.Process(cmd=command, envs=_build_envs(envs), init=init)
app_storage = container.Storage(mounts=_build_mounts(mounts))
app_network = container.Network(
rdma=rdma, ports=_build_ports(ports, public_ports)
)
app_host = container.Host(shm_size=_build_shm_size(shm_size))
app_config = container.Config(
image=app_image,
resource_spec=resource_spec.am_resource,
process=app_process,
storage=app_storage,
network=app_network,
host=app_host,
)
mini_cluster_setting = mini_cluster.MiniClusterSetting(overlay=overlay)
job_config = mini_cluster.Config(
app_manager=app_config,
resource_spec=resource_spec,
mini_cluster_setting=mini_cluster_setting,
)
job_request = MCCreateRequest(
config=job_config, name=name, workspace_id=workspace_id, tags=tags or []
)
res = self._session.post(
f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", json=job_request.to_dict()
)
assert_response(res, 201)
job_info = MCCreateResponse.from_dict(res.json())
return job_info
delete(self, mc_id)
¶
Remove a mini-cluster by its id.
Source code in crane/lib/sync/user/mini_cluster.py
def delete(self, mc_id: str) -> None:
"""Remove a mini-cluster by its id."""
url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=mc_id)}"
res = self._session.delete(url)
assert_response(res)
filter(self, id_or_name, tags=None, state=None)
¶
Filter jobs by id, name or tags.
Source code in crane/lib/sync/user/mini_cluster.py
def filter(
self,
id_or_name: str,
tags: list[str] | None = None,
state: list[str] | None = None,
) -> list[str]:
"""Filter jobs by id, name or tags."""
params = dict(id_or_name=id_or_name, tags=tags or [], state=state or [])
res = self._session.get(
f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", params=params
)
assert_response(res)
job_list = res.json()
return job_list
inspect(self, job_id)
¶
Inspect a job by its id.
Source code in crane/lib/sync/user/mini_cluster.py
def inspect(self, job_id: str) -> MCInspectResponse:
"""Inspect a job by its id."""
url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=job_id)}"
res = self._session.get(url)
assert_response(res)
return MCInspectResponse.from_json(res.text)
kill(self, mc_id, force)
¶
Kill a mini-cluster by its id.
Source code in crane/lib/sync/user/mini_cluster.py
def kill(self, mc_id: str, force: bool) -> None:
"""Kill a mini-cluster by its id."""
params = {"force": "true" if force else "false"}
url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_KILL.format(mc_id=mc_id)}"
res = self._session.post(url, params=params)
assert_response(res)
log(self, mc_id, follow=False, filter_=LogFilter(source=None, since=None))
¶
Iterate through a log.
Source code in crane/lib/sync/user/mini_cluster.py
def log(
self,
mc_id: str,
follow: bool = False,
filter_: log_model.LogFilter = log_model.LogFilter(),
) -> Iterator[log_model.Log]:
"""Iterate through a log."""
params: dict[str, Any] = dict(follow=follow)
body = filter_.to_dict()
url = f"/gateway{C.Gateway.URL.JOB_LOG.format(mc_id=mc_id)}"
with self._session.stream("GET", url, params=params, json=body) as res:
assert_response(res)
for line in res.iter_lines():
# Filter keep-alive
if line:
yield log_model.Log.from_json(line)
Commands for resource.
cluster_resource(self)
¶
Query resource state.
Returns:
Type | Description |
---|---|
resource.PhysicalAllocationCluster |
resource.PhysicalAllocationCluster: cluster resource state |
Exceptions:
Type | Description |
---|---|
HTTPBadResponseError |
if status code is not expected |
HTTPConnectionError |
if connection fails |
Source code in crane/lib/sync/user/resource.py
def cluster_resource(self) -> resource.PhysicalAllocationCluster:
"""Query resource state.
Returns:
resource.PhysicalAllocationCluster: cluster resource state
Raises:
HTTPBadResponseError: if status code is not expected
HTTPConnectionError: if connection fails
"""
res = self._session.get(f"/gateway{C.Gateway.URL.CLUSTER_RESOURCE}")
assert_response(res)
return resource.PhysicalAllocationCluster.from_dict(res.json())
Commands for session management.
sign_in(self, browser=True)
¶
Sign in to crane.
TODO: add other authorization strategies for scripts¶
TODO: move console prints to cli level.¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
browser |
bool |
If true, opens the browser. Defaults to True. |
True |
Source code in crane/lib/sync/user/user.py
def sign_in(self, browser: bool = True) -> None:
"""Sign in to crane.
# TODO: add other authorization strategies for scripts
# TODO: move console prints to cli level.
Args:
browser (bool): If true, opens the browser. Defaults to True.
"""
# initiate device login flow.
res = self._session.get(f"/gateway{C.Gateway.URL.AUTH_DEVICE}")
assert_response(res)
flow_info = user.DeviceFlowInfo.from_json(res.text)
if browser:
input("Press ENTER to open login page in your browser.")
has_browser = webbrowser.open(flow_info.redirect_uri)
if not has_browser:
# TODO: need better exception class.
raise OSError("No web browser found.")
else:
print("Go to the link below in a web browser.", flush=True)
print(flow_info.redirect_uri, flush=True)
# wait until login is completed
console = Console()
body = user.TokenRequest(device_code=flow_info.device_code)
with console.status(
"[bold green]Checking for authorization results...", spinner="pong"
):
for sleep_duration in generate_sequence("fibonacci", init=2, upper=10):
sync_sleep(sleep_duration)
res = self._session.post(
f"/gateway{C.Gateway.URL.AUTH_DEVICE_TOKEN}", json=body.to_dict()
)
assert_response(res, (200, 429))
if res.status_code == 200:
break
token = user.Token.from_json(res.text)
self._config.access_token = token.access_token
self._config.refresh_token = token.refresh_token
self._config.save()
sign_out(self)
¶
Sign out user.
Exceptions:
Type | Description |
---|---|
HTTPBadResponseError |
if status code is not expected |
ConnectError |
if connection fails |
Source code in crane/lib/sync/user/user.py
def sign_out(self) -> None:
"""Sign out user.
Raises:
HTTPBadResponseError: if status code is not expected
ConnectError: if connection fails
"""
self._config.access_token = None
self._config.refresh_token = None
self._config.save()
whoami(self)
¶
Return public user info about this user.
Returns:
Type | Description |
---|---|
user.UserInfo |
user.UserInfo: user info |
Exceptions:
Type | Description |
---|---|
HTTPBadResponseError |
if status code is not expected |
ConnectError |
if connection fails |
Source code in crane/lib/sync/user/user.py
def whoami(self) -> user.UserInfo:
"""Return public user info about this user.
Returns:
user.UserInfo: user info
Raises:
HTTPBadResponseError: if status code is not expected
ConnectError: if connection fails
"""
res = self._session.get(f"/gateway{C.Gateway.URL.USER_DETAIL}")
assert_response(res)
return user.UserInfo.from_json(res.text)
Commands for workspaces.
add(self)
¶
Zip workspace and send the tarball to server.
Returns:
Type | Description |
---|---|
str |
context_id (str): id of newly created context |
Source code in crane/lib/sync/user/workspace.py
def add(self) -> str:
"""Zip workspace and send the tarball to server.
Returns:
context_id (str): id of newly created context
"""
zip_file_path = zip_workspace(Path.cwd())
with open(zip_file_path, "rb") as f:
files = {"file": ("file", f, "application/x-tar")}
res = self._session.post(
f"/workspace{C.WsServer.URL.WORKSPACE}", files=files
)
assert_response(res, 201)
context_id = res.json()
# TODO: remove zip_file_path
return context_id
init(self)
¶
Initialize crane workspace in git-like manner.
After init function, the directory will look like, / | .crane/ | config.yaml | .craneignore | {workspace_id}.tar
Returns:
Type | Description |
---|---|
bool |
bool: whether re-initialize |
Source code in crane/lib/sync/user/workspace.py
def init(self) -> bool:
"""Initialize crane workspace in git-like manner.
After init function, the directory will look like,
/
|_ .crane/
|_ config.yaml
|_ .craneignore
|_ {workspace_id}.tar
Returns:
bool: whether re-initialize
"""
context_path = Path.cwd() / C.Workspace.CONTEXT_DIR
config_path = context_path / C.Workspace.CONFIG_PATH
reinit_flag = config_path.exists()
if not reinit_flag:
os.makedirs(context_path, exist_ok=True)
default_config = {"container_path": _default_container_path()}
with open(config_path, "w", encoding="utf-8") as f:
# TODO: implement more on config file
yaml.dump(default_config, f)
return reinit_flag
is_workspace_server_alive(self)
¶
Check if ws server is alive.
Source code in crane/lib/sync/user/workspace.py
def is_workspace_server_alive(self) -> bool:
"""Check if ws server is alive."""
res = self._session.get(f"/workspace{C.WsServer.URL.PING}", timeout=1)
return res.status_code == 200