Skip to content

Crane Library

Async client class for crane users.

Attributes:

Name Type Description
job JobCommandCollection

job commands

mini_cluster MiniClusterCommandCollection

Mini cluster commands

resource ResourceCommandCollection

Resource commands

user UserCommandCollection

User commands

superuser SuperuserCommandCollection

Superuser commands

ws WorkspaceCommandCollection

Workspace commands

__class__ inherited

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

__instancecheck__(cls, instance) special

Override for isinstance(instance, cls).

Source code in crane/lib/sync/user/manager.py
def __instancecheck__(cls, instance):
    """Override for isinstance(instance, cls)."""
    return _abc_instancecheck(cls, instance)

__new__(mcls, name, bases, namespace, **kwargs) special staticmethod

Create and return a new object. See help(type) for accurate signature.

Source code in crane/lib/sync/user/manager.py
def __new__(mcls, name, bases, namespace, **kwargs):
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
    _abc_init(cls)
    return cls

__subclasscheck__(cls, subclass) special

Override for issubclass(subclass, cls).

Source code in crane/lib/sync/user/manager.py
def __subclasscheck__(cls, subclass):
    """Override for issubclass(subclass, cls)."""
    return _abc_subclasscheck(cls, subclass)

register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

Source code in crane/lib/sync/user/manager.py
def register(cls, subclass):
    """Register a virtual subclass of an ABC.

    Returns the subclass, to allow usage as a class decorator.
    """
    return _abc_register(cls, subclass)

config_template

Configuration for crane user client.

access_token: Any property writable

Configuration access_token[typing.Optional[str]]

refresh_token: Any property writable

Configuration refresh_token[typing.Optional[str]]

url: Any property writable

Configuration url[]

Meta

Prefix.

save(self) inherited

Save configuration.

Atomic write via write & rename.

Source code in crane/lib/sync/user/manager.py
def save(self) -> None:
    """Save configuration.

    Atomic write via write & rename.
    """
    with atomic_write(self._file_path) as f:
        json.dump(self._config, f)

to_dict(self) inherited

Return original configuration dictionary.

Source code in crane/lib/sync/user/manager.py
def to_dict(self) -> Mapping[str, Any]:
    """Return original configuration dictionary."""
    defaults = getattr(self, "_defaults")
    return dict(ChainMap(self._env_overrides, self._config, defaults))

__init__(self, session, config) special

Initialize.

Source code in crane/lib/sync/user/manager.py
def __init__(self, session: SyncHTTPClient, config: UserLibConfig) -> None:
    """Initialize."""
    super().__init__(session, config)

    # TODO: implement session that prefixes session
    self.mini_cluster = MiniClusterCommandCollection(self)
    self.resource = ResourceCommandCollection(self)
    self.user = UserCommandCollection(self)
    self.ws = WorkspaceCommandCollection(self)

    self.job = JobCommandCollection(self.mini_cluster)

    if self.config.access_token is not None:
        headers = dict(Authorization=f"Bearer {self.config.access_token}")
        self.session.headers.update(headers)

close(self) inherited

Close client.

Source code in crane/lib/sync/user/manager.py
def close(self) -> None:
    """Close client."""
    self.session.close()

is_healthy(self) inherited

Return true if server responses with ping.

Source code in crane/lib/sync/user/manager.py
def is_healthy(self) -> bool:
    """Return true if server responses with ping."""
    try:
        self.ping()
        return True
    except (HTTPBadResponseError, HTTPError):
        return False

ping(self)

Ping... and pong.

Source code in crane/lib/sync/user/manager.py
def ping(self) -> None:
    """Ping... and pong."""
    res = self.session.get(f"/gateway{C.Gateway.URL.PING}", timeout=1)
    assert_response(res, 200)

Async client class for crane users.

Attributes:

Name Type Description
job JobCommandCollection

job commands

mini_cluster MiniClusterCommandCollection

Mini cluster commands

resource ResourceCommandCollection

Resource commands

user UserCommandCollection

User commands

superuser SuperuserCommandCollection

Superuser commands

ws WorkspaceCommandCollection

Workspace commands

__class__ inherited

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

__instancecheck__(cls, instance) special

Override for isinstance(instance, cls).

Source code in crane/lib/aio/user/manager.py
def __instancecheck__(cls, instance):
    """Override for isinstance(instance, cls)."""
    return _abc_instancecheck(cls, instance)

__new__(mcls, name, bases, namespace, **kwargs) special staticmethod

Create and return a new object. See help(type) for accurate signature.

Source code in crane/lib/aio/user/manager.py
def __new__(mcls, name, bases, namespace, **kwargs):
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
    _abc_init(cls)
    return cls

__subclasscheck__(cls, subclass) special

Override for issubclass(subclass, cls).

Source code in crane/lib/aio/user/manager.py
def __subclasscheck__(cls, subclass):
    """Override for issubclass(subclass, cls)."""
    return _abc_subclasscheck(cls, subclass)

register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

Source code in crane/lib/aio/user/manager.py
def register(cls, subclass):
    """Register a virtual subclass of an ABC.

    Returns the subclass, to allow usage as a class decorator.
    """
    return _abc_register(cls, subclass)

config_template

Configuration for crane user client.

access_token: Any property writable

Configuration access_token[typing.Optional[str]]

refresh_token: Any property writable

Configuration refresh_token[typing.Optional[str]]

url: Any property writable

Configuration url[]

Meta

Prefix.

save(self) inherited

Save configuration.

Atomic write via write & rename.

Source code in crane/lib/aio/user/manager.py
def save(self) -> None:
    """Save configuration.

    Atomic write via write & rename.
    """
    with atomic_write(self._file_path) as f:
        json.dump(self._config, f)

to_dict(self) inherited

Return original configuration dictionary.

Source code in crane/lib/aio/user/manager.py
def to_dict(self) -> Mapping[str, Any]:
    """Return original configuration dictionary."""
    defaults = getattr(self, "_defaults")
    return dict(ChainMap(self._env_overrides, self._config, defaults))

__init__(self, session, config) special

Initialize.

Source code in crane/lib/aio/user/manager.py
def __init__(self, session: AsyncHTTPClient, config: UserLibConfig) -> None:
    """Initialize."""
    super().__init__(session, config)

    # TODO: implement session that prefixes session
    self.mini_cluster = MiniClusterCommandCollection(self)
    self.resource = ResourceCommandCollection(self)
    self.user = UserCommandCollection(self)
    self.ws = WorkspaceCommandCollection(self)

    self.job = JobCommandCollection(self.mini_cluster)

    if self.config.access_token is not None:
        headers = dict(Authorization=f"Bearer {self.config.access_token}")
        self.session.headers.update(headers)

close(self) async inherited

Close client.

Source code in crane/lib/aio/user/manager.py
async def close(self) -> None:
    """Close client."""
    await self.session.aclose()

is_healthy(self) async inherited

Return true if server responses with ping.

Source code in crane/lib/aio/user/manager.py
async def is_healthy(self) -> bool:
    """Return true if server responses with ping."""
    try:
        await self.ping()
        return True
    except (HTTPBadResponseError, HTTPError):
        return False

ping(self) async

Ping... and pong.

Source code in crane/lib/aio/user/manager.py
async def ping(self) -> None:
    """Ping... and pong."""
    res = await self.session.get(f"/gateway{C.Gateway.URL.PING}", timeout=1)
    assert_response(res, 200)

Job subcommands.

__init__(self, mini_cluster) special

Initialize.

Source code in crane/lib/aio/user/job.py
def __init__(self, mini_cluster: MiniClusterCommandCollection) -> None:
    """Initialize."""
    self.mini_cluster = mini_cluster

add(self, *args, **kwargs) async

Add a new job to cluster.

Source code in crane/lib/aio/user/job.py
async def add(self, *args, **kwargs) -> MCCreateResponse:
    """Add a new job to cluster."""
    # TODO: args will be added later. Removed now to avoid lint error
    return await self.mini_cluster.add(*args, **kwargs)

delete(self, job_id) async

Remove a job by its id.

Source code in crane/lib/aio/user/job.py
async def delete(self, job_id: str) -> None:
    """Remove a job by its id."""
    return await self.mini_cluster.delete(job_id)

filter(self, job_id_or_name, tags=None, state=None) async

Filter jobs by id, name or tags.

Source code in crane/lib/aio/user/job.py
async def filter(
    self,
    job_id_or_name: str,
    tags: list[str] | None = None,
    state: list[str] | None = None,
) -> list[str]:
    """Filter jobs by id, name or tags."""
    return await self.mini_cluster.filter(job_id_or_name, tags, state)

inspect(self, job_id) async

Inspect a job by its id.

Source code in crane/lib/aio/user/job.py
async def inspect(self, job_id: str) -> MCInspectResponse:
    """Inspect a job by its id."""
    return await self.mini_cluster.inspect(job_id)

kill(self, job_id, force) async

Kill a job by its id.

Source code in crane/lib/aio/user/job.py
async def kill(self, job_id: str, force: bool) -> None:
    """Kill a job by its id."""
    return await self.mini_cluster.kill(job_id, force)

log(self, job_id, follow=False, filter_=LogFilter(source=None, since=None))

Iterate through a log.

Source code in crane/lib/aio/user/job.py
async def log(
    self,
    job_id: str,
    follow: bool = False,
    filter_: log_model.LogFilter = log_model.LogFilter(),
) -> AsyncIterator[log_model.Log]:
    """Iterate through a log."""
    async for log in self.mini_cluster.log(job_id, follow, filter_):
        yield log

Job subcommands.

add(self, image, resource, command=None, envs=None, mounts=None, ports=None, public_ports=None, rdma=False, overlay=False, shm_size='64m', name=None, tags=None, init=False, workspace_id=None) async

Add a new job to cluster.

Source code in crane/lib/aio/user/mini_cluster.py
async def add(
    self,
    image: str,
    resource: str,
    command: str | None = None,
    envs: list[str] | None = None,
    mounts: list[str] | None = None,
    ports: list[int] | None = None,
    public_ports: list[int] | None = None,
    rdma: bool = False,
    overlay: bool = False,
    shm_size: str = "64m",
    name: str | None = None,
    tags: list[str] | None = None,
    init: bool = False,
    workspace_id: str | None = None,
) -> MCCreateResponse:
    """Add a new job to cluster."""
    envs = envs or []
    mounts = mounts or []
    ports = ports or []
    public_ports = public_ports or []

    # build config
    resource_spec = _build_resource(resource)

    app_image = container.Image(name=image)
    app_process = container.Process(cmd=command, envs=_build_envs(envs), init=init)
    app_storage = container.Storage(mounts=_build_mounts(mounts))
    app_network = container.Network(
        rdma=rdma, ports=_build_ports(ports, public_ports)
    )
    app_host = container.Host(shm_size=_build_shm_size(shm_size))
    app_config = container.Config(
        image=app_image,
        resource_spec=resource_spec.am_resource,
        process=app_process,
        storage=app_storage,
        network=app_network,
        host=app_host,
    )

    mini_cluster_setting = mini_cluster.MiniClusterSetting(overlay=overlay)
    job_config = mini_cluster.Config(
        app_manager=app_config,
        resource_spec=resource_spec,
        mini_cluster_setting=mini_cluster_setting,
    )
    job_request = MCCreateRequest(
        config=job_config, name=name, workspace_id=workspace_id, tags=tags or []
    )

    res = await self._session.post(
        f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", json=job_request.to_dict()
    )
    assert_response(res, 201)
    job_info = MCCreateResponse.from_dict(res.json())
    return job_info

delete(self, mc_id) async

Remove a mini-cluster by its id.

Source code in crane/lib/aio/user/mini_cluster.py
async def delete(self, mc_id: str) -> None:
    """Remove a mini-cluster by its id."""
    url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=mc_id)}"
    res = await self._session.delete(url)
    assert_response(res)

filter(self, id_or_name, tags=None, state=None) async

Filter jobs by id, name or tags.

Source code in crane/lib/aio/user/mini_cluster.py
async def filter(
    self,
    id_or_name: str,
    tags: list[str] | None = None,
    state: list[str] | None = None,
) -> list[str]:
    """Filter jobs by id, name or tags."""
    params = dict(id_or_name=id_or_name, tags=tags or [], state=state or [])
    res = await self._session.get(
        f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", params=params
    )
    assert_response(res)

    job_list = res.json()

    return job_list

inspect(self, job_id) async

Inspect a job by its id.

Source code in crane/lib/aio/user/mini_cluster.py
async def inspect(self, job_id: str) -> MCInspectResponse:
    """Inspect a job by its id."""
    url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=job_id)}"
    res = await self._session.get(url)
    assert_response(res)
    return MCInspectResponse.from_json(res.text)

kill(self, mc_id, force) async

Kill a mini-cluster by its id.

Source code in crane/lib/aio/user/mini_cluster.py
async def kill(self, mc_id: str, force: bool) -> None:
    """Kill a mini-cluster by its id."""
    params = {"force": "true" if force else "false"}
    url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_KILL.format(mc_id=mc_id)}"
    res = await self._session.post(url, params=params)
    assert_response(res)

log(self, mc_id, follow=False, filter_=LogFilter(source=None, since=None))

Iterate through a log.

Source code in crane/lib/aio/user/mini_cluster.py
async def log(
    self,
    mc_id: str,
    follow: bool = False,
    filter_: log_model.LogFilter = log_model.LogFilter(),
) -> AsyncIterator[log_model.Log]:
    """Iterate through a log."""
    params: dict[str, Any] = dict(follow=follow)
    body = filter_.to_dict()

    url = f"/gateway{C.Gateway.URL.JOB_LOG.format(mc_id=mc_id)}"

    with self._session.stream("GET", url, params=params, json=body) as res:
        assert_response(res)
        async for line in res.aiter_lines():
            # Filter keep-alive
            if line:
                yield log_model.Log.from_json(line)

Commands for resource.

cluster_resource(self) async

Query resource state.

Returns:

Type Description
resource.PhysicalAllocationCluster

resource.PhysicalAllocationCluster: cluster resource state

Exceptions:

Type Description
HTTPBadResponseError

if status code is not expected

HTTPConnectionError

if connection fails

Source code in crane/lib/aio/user/resource.py
async def cluster_resource(self) -> resource.PhysicalAllocationCluster:
    """Query resource state.

    Returns:
        resource.PhysicalAllocationCluster: cluster resource state

    Raises:
        HTTPBadResponseError: if status code is not expected
        HTTPConnectionError: if connection fails

    """
    res = await self._session.get(f"/gateway{C.Gateway.URL.CLUSTER_RESOURCE}")
    assert_response(res)
    return resource.PhysicalAllocationCluster.from_dict(res.json())

Commands for session management.

sign_in(self, browser=True) async

Sign in to crane.

TODO: add other authorization strategies for scripts

TODO: move console prints to cli level.

Parameters:

Name Type Description Default
browser bool

If true, opens the browser. Defaults to True.

True
Source code in crane/lib/aio/user/user.py
async def sign_in(self, browser: bool = True) -> None:
    """Sign in to crane.

    # TODO: add other authorization strategies for scripts
    # TODO: move console prints to cli level.

    Args:
        browser (bool): If true, opens the browser. Defaults to True.

    """
    # initiate device login flow.
    res = await self._session.get(f"/gateway{C.Gateway.URL.AUTH_DEVICE}")
    assert_response(res)
    flow_info = user.DeviceFlowInfo.from_json(res.text)

    if browser:
        input("Press ENTER to open login page in your browser.")
        has_browser = webbrowser.open(flow_info.redirect_uri)
        if not has_browser:
            # TODO: need better exception class.
            raise OSError("No web browser found.")

    else:
        print("Go to the link below in a web browser.", flush=True)
        print(flow_info.redirect_uri, flush=True)

    # wait until login is completed

    console = Console()
    body = user.TokenRequest(device_code=flow_info.device_code)

    with console.status(
        "[bold green]Checking for authorization results...", spinner="pong"
    ):
        for sleep_duration in generate_sequence("fibonacci", init=2, upper=10):
            await async_sleep(sleep_duration)
            res = await self._session.post(
                f"/gateway{C.Gateway.URL.AUTH_DEVICE_TOKEN}", json=body.to_dict()
            )
            assert_response(res, (200, 429))

            if res.status_code == 200:
                break

    token = user.Token.from_json(res.text)
    self._config.access_token = token.access_token
    self._config.refresh_token = token.refresh_token
    self._config.save()

sign_out(self) async

Sign out user.

Exceptions:

Type Description
HTTPBadResponseError

if status code is not expected

ConnectError

if connection fails

Source code in crane/lib/aio/user/user.py
async def sign_out(self) -> None:
    """Sign out user.

    Raises:
        HTTPBadResponseError: if status code is not expected
        ConnectError: if connection fails

    """
    self._config.access_token = None
    self._config.refresh_token = None
    self._config.save()

whoami(self) async

Return public user info about this user.

Returns:

Type Description
user.UserInfo

user.UserInfo: user info

Exceptions:

Type Description
HTTPBadResponseError

if status code is not expected

ConnectError

if connection fails

Source code in crane/lib/aio/user/user.py
async def whoami(self) -> user.UserInfo:
    """Return public user info about this user.

    Returns:
        user.UserInfo: user info

    Raises:
        HTTPBadResponseError: if status code is not expected
        ConnectError: if connection fails

    """
    res = await self._session.get(f"/gateway{C.Gateway.URL.USER_DETAIL}")
    assert_response(res)
    return user.UserInfo.from_json(res.text)

Commands for workspaces.

add(self) async

Zip workspace and send the tarball to server.

Returns:

Type Description
str

context_id (str): id of newly created context

Source code in crane/lib/aio/user/workspace.py
async def add(self) -> str:
    """Zip workspace and send the tarball to server.

    Returns:
        context_id (str): id of newly created context

    """
    zip_file_path = zip_workspace(Path.cwd())
    with open(zip_file_path, "rb") as f:
        files = {"file": ("file", f, "application/x-tar")}
        res = await self._session.post(
            f"/workspace{C.WsServer.URL.WORKSPACE}", files=files
        )

    assert_response(res, 201)
    context_id = res.json()

    # TODO: remove zip_file_path

    return context_id

init(self) async

Initialize crane workspace in git-like manner.

After init function, the directory will look like, / | .crane/ | config.yaml | .craneignore | {workspace_id}.tar

Returns:

Type Description
bool

bool: whether re-initialize

Source code in crane/lib/aio/user/workspace.py
async def init(self) -> bool:
    """Initialize crane workspace in git-like manner.

    After init function, the directory will look like,
    /
    |_ .crane/
        |_ config.yaml
        |_ .craneignore
        |_ {workspace_id}.tar

    Returns:
        bool: whether re-initialize

    """
    context_path = Path.cwd() / C.Workspace.CONTEXT_DIR
    config_path = context_path / C.Workspace.CONFIG_PATH

    reinit_flag = config_path.exists()

    if not reinit_flag:
        os.makedirs(context_path, exist_ok=True)

        default_config = {"container_path": _default_container_path()}
        with open(config_path, "w", encoding="utf-8") as f:
            # TODO: implement more on config file
            yaml.dump(default_config, f)

    return reinit_flag

is_workspace_server_alive(self) async

Check if ws server is alive.

Source code in crane/lib/aio/user/workspace.py
async def is_workspace_server_alive(self) -> bool:
    """Check if ws server is alive."""
    res = await self._session.get(f"/workspace{C.WsServer.URL.PING}", timeout=1)
    return res.status_code == 200

Job subcommands.

__init__(self, mini_cluster) special

Initialize.

Source code in crane/lib/sync/user/job.py
def __init__(self, mini_cluster: MiniClusterCommandCollection) -> None:
    """Initialize."""
    self.mini_cluster = mini_cluster

add(self, *args, **kwargs)

Add a new job to cluster.

Source code in crane/lib/sync/user/job.py
def add(self, *args, **kwargs) -> MCCreateResponse:
    """Add a new job to cluster."""
    # TODO: args will be added later. Removed now to avoid lint error
    return self.mini_cluster.add(*args, **kwargs)

delete(self, job_id)

Remove a job by its id.

Source code in crane/lib/sync/user/job.py
def delete(self, job_id: str) -> None:
    """Remove a job by its id."""
    return self.mini_cluster.delete(job_id)

filter(self, job_id_or_name, tags=None, state=None)

Filter jobs by id, name or tags.

Source code in crane/lib/sync/user/job.py
def filter(
    self,
    job_id_or_name: str,
    tags: list[str] | None = None,
    state: list[str] | None = None,
) -> list[str]:
    """Filter jobs by id, name or tags."""
    return self.mini_cluster.filter(job_id_or_name, tags, state)

inspect(self, job_id)

Inspect a job by its id.

Source code in crane/lib/sync/user/job.py
def inspect(self, job_id: str) -> MCInspectResponse:
    """Inspect a job by its id."""
    return self.mini_cluster.inspect(job_id)

kill(self, job_id, force)

Kill a job by its id.

Source code in crane/lib/sync/user/job.py
def kill(self, job_id: str, force: bool) -> None:
    """Kill a job by its id."""
    return self.mini_cluster.kill(job_id, force)

log(self, job_id, follow=False, filter_=LogFilter(source=None, since=None))

Iterate through a log.

Source code in crane/lib/sync/user/job.py
def log(
    self,
    job_id: str,
    follow: bool = False,
    filter_: log_model.LogFilter = log_model.LogFilter(),
) -> Iterator[log_model.Log]:
    """Iterate through a log."""
    for log in self.mini_cluster.log(job_id, follow, filter_):
        yield log

Job subcommands.

add(self, image, resource, command=None, envs=None, mounts=None, ports=None, public_ports=None, rdma=False, overlay=False, shm_size='64m', name=None, tags=None, init=False, workspace_id=None)

Add a new job to cluster.

Source code in crane/lib/sync/user/mini_cluster.py
def add(
    self,
    image: str,
    resource: str,
    command: str | None = None,
    envs: list[str] | None = None,
    mounts: list[str] | None = None,
    ports: list[int] | None = None,
    public_ports: list[int] | None = None,
    rdma: bool = False,
    overlay: bool = False,
    shm_size: str = "64m",
    name: str | None = None,
    tags: list[str] | None = None,
    init: bool = False,
    workspace_id: str | None = None,
) -> MCCreateResponse:
    """Add a new job to cluster."""
    envs = envs or []
    mounts = mounts or []
    ports = ports or []
    public_ports = public_ports or []

    # build config
    resource_spec = _build_resource(resource)

    app_image = container.Image(name=image)
    app_process = container.Process(cmd=command, envs=_build_envs(envs), init=init)
    app_storage = container.Storage(mounts=_build_mounts(mounts))
    app_network = container.Network(
        rdma=rdma, ports=_build_ports(ports, public_ports)
    )
    app_host = container.Host(shm_size=_build_shm_size(shm_size))
    app_config = container.Config(
        image=app_image,
        resource_spec=resource_spec.am_resource,
        process=app_process,
        storage=app_storage,
        network=app_network,
        host=app_host,
    )

    mini_cluster_setting = mini_cluster.MiniClusterSetting(overlay=overlay)
    job_config = mini_cluster.Config(
        app_manager=app_config,
        resource_spec=resource_spec,
        mini_cluster_setting=mini_cluster_setting,
    )
    job_request = MCCreateRequest(
        config=job_config, name=name, workspace_id=workspace_id, tags=tags or []
    )

    res = self._session.post(
        f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", json=job_request.to_dict()
    )
    assert_response(res, 201)
    job_info = MCCreateResponse.from_dict(res.json())
    return job_info

delete(self, mc_id)

Remove a mini-cluster by its id.

Source code in crane/lib/sync/user/mini_cluster.py
def delete(self, mc_id: str) -> None:
    """Remove a mini-cluster by its id."""
    url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=mc_id)}"
    res = self._session.delete(url)
    assert_response(res)

filter(self, id_or_name, tags=None, state=None)

Filter jobs by id, name or tags.

Source code in crane/lib/sync/user/mini_cluster.py
def filter(
    self,
    id_or_name: str,
    tags: list[str] | None = None,
    state: list[str] | None = None,
) -> list[str]:
    """Filter jobs by id, name or tags."""
    params = dict(id_or_name=id_or_name, tags=tags or [], state=state or [])
    res = self._session.get(
        f"/gateway{C.Gateway.URL.MINI_CLUSTER_LIST}", params=params
    )
    assert_response(res)

    job_list = res.json()

    return job_list

inspect(self, job_id)

Inspect a job by its id.

Source code in crane/lib/sync/user/mini_cluster.py
def inspect(self, job_id: str) -> MCInspectResponse:
    """Inspect a job by its id."""
    url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_DETAIL.format(mc_id=job_id)}"
    res = self._session.get(url)
    assert_response(res)
    return MCInspectResponse.from_json(res.text)

kill(self, mc_id, force)

Kill a mini-cluster by its id.

Source code in crane/lib/sync/user/mini_cluster.py
def kill(self, mc_id: str, force: bool) -> None:
    """Kill a mini-cluster by its id."""
    params = {"force": "true" if force else "false"}
    url = f"/gateway{C.Gateway.URL.MINI_CLUSTER_KILL.format(mc_id=mc_id)}"
    res = self._session.post(url, params=params)
    assert_response(res)

log(self, mc_id, follow=False, filter_=LogFilter(source=None, since=None))

Iterate through a log.

Source code in crane/lib/sync/user/mini_cluster.py
def log(
    self,
    mc_id: str,
    follow: bool = False,
    filter_: log_model.LogFilter = log_model.LogFilter(),
) -> Iterator[log_model.Log]:
    """Iterate through a log."""
    params: dict[str, Any] = dict(follow=follow)
    body = filter_.to_dict()

    url = f"/gateway{C.Gateway.URL.JOB_LOG.format(mc_id=mc_id)}"

    with self._session.stream("GET", url, params=params, json=body) as res:
        assert_response(res)
        for line in res.iter_lines():
            # Filter keep-alive
            if line:
                yield log_model.Log.from_json(line)

Commands for resource.

cluster_resource(self)

Query resource state.

Returns:

Type Description
resource.PhysicalAllocationCluster

resource.PhysicalAllocationCluster: cluster resource state

Exceptions:

Type Description
HTTPBadResponseError

if status code is not expected

HTTPConnectionError

if connection fails

Source code in crane/lib/sync/user/resource.py
def cluster_resource(self) -> resource.PhysicalAllocationCluster:
    """Query resource state.

    Returns:
        resource.PhysicalAllocationCluster: cluster resource state

    Raises:
        HTTPBadResponseError: if status code is not expected
        HTTPConnectionError: if connection fails

    """
    res = self._session.get(f"/gateway{C.Gateway.URL.CLUSTER_RESOURCE}")
    assert_response(res)
    return resource.PhysicalAllocationCluster.from_dict(res.json())

Commands for session management.

sign_in(self, browser=True)

Sign in to crane.

TODO: add other authorization strategies for scripts

TODO: move console prints to cli level.

Parameters:

Name Type Description Default
browser bool

If true, opens the browser. Defaults to True.

True
Source code in crane/lib/sync/user/user.py
def sign_in(self, browser: bool = True) -> None:
    """Sign in to crane.

    # TODO: add other authorization strategies for scripts
    # TODO: move console prints to cli level.

    Args:
        browser (bool): If true, opens the browser. Defaults to True.

    """
    # initiate device login flow.
    res = self._session.get(f"/gateway{C.Gateway.URL.AUTH_DEVICE}")
    assert_response(res)
    flow_info = user.DeviceFlowInfo.from_json(res.text)

    if browser:
        input("Press ENTER to open login page in your browser.")
        has_browser = webbrowser.open(flow_info.redirect_uri)
        if not has_browser:
            # TODO: need better exception class.
            raise OSError("No web browser found.")

    else:
        print("Go to the link below in a web browser.", flush=True)
        print(flow_info.redirect_uri, flush=True)

    # wait until login is completed

    console = Console()
    body = user.TokenRequest(device_code=flow_info.device_code)

    with console.status(
        "[bold green]Checking for authorization results...", spinner="pong"
    ):
        for sleep_duration in generate_sequence("fibonacci", init=2, upper=10):
            sync_sleep(sleep_duration)
            res = self._session.post(
                f"/gateway{C.Gateway.URL.AUTH_DEVICE_TOKEN}", json=body.to_dict()
            )
            assert_response(res, (200, 429))

            if res.status_code == 200:
                break

    token = user.Token.from_json(res.text)
    self._config.access_token = token.access_token
    self._config.refresh_token = token.refresh_token
    self._config.save()

sign_out(self)

Sign out user.

Exceptions:

Type Description
HTTPBadResponseError

if status code is not expected

ConnectError

if connection fails

Source code in crane/lib/sync/user/user.py
def sign_out(self) -> None:
    """Sign out user.

    Raises:
        HTTPBadResponseError: if status code is not expected
        ConnectError: if connection fails

    """
    self._config.access_token = None
    self._config.refresh_token = None
    self._config.save()

whoami(self)

Return public user info about this user.

Returns:

Type Description
user.UserInfo

user.UserInfo: user info

Exceptions:

Type Description
HTTPBadResponseError

if status code is not expected

ConnectError

if connection fails

Source code in crane/lib/sync/user/user.py
def whoami(self) -> user.UserInfo:
    """Return public user info about this user.

    Returns:
        user.UserInfo: user info

    Raises:
        HTTPBadResponseError: if status code is not expected
        ConnectError: if connection fails

    """
    res = self._session.get(f"/gateway{C.Gateway.URL.USER_DETAIL}")
    assert_response(res)
    return user.UserInfo.from_json(res.text)

Commands for workspaces.

add(self)

Zip workspace and send the tarball to server.

Returns:

Type Description
str

context_id (str): id of newly created context

Source code in crane/lib/sync/user/workspace.py
def add(self) -> str:
    """Zip workspace and send the tarball to server.

    Returns:
        context_id (str): id of newly created context

    """
    zip_file_path = zip_workspace(Path.cwd())
    with open(zip_file_path, "rb") as f:
        files = {"file": ("file", f, "application/x-tar")}
        res = self._session.post(
            f"/workspace{C.WsServer.URL.WORKSPACE}", files=files
        )

    assert_response(res, 201)
    context_id = res.json()

    # TODO: remove zip_file_path

    return context_id

init(self)

Initialize crane workspace in git-like manner.

After init function, the directory will look like, / | .crane/ | config.yaml | .craneignore | {workspace_id}.tar

Returns:

Type Description
bool

bool: whether re-initialize

Source code in crane/lib/sync/user/workspace.py
def init(self) -> bool:
    """Initialize crane workspace in git-like manner.

    After init function, the directory will look like,
    /
    |_ .crane/
        |_ config.yaml
        |_ .craneignore
        |_ {workspace_id}.tar

    Returns:
        bool: whether re-initialize

    """
    context_path = Path.cwd() / C.Workspace.CONTEXT_DIR
    config_path = context_path / C.Workspace.CONFIG_PATH

    reinit_flag = config_path.exists()

    if not reinit_flag:
        os.makedirs(context_path, exist_ok=True)

        default_config = {"container_path": _default_container_path()}
        with open(config_path, "w", encoding="utf-8") as f:
            # TODO: implement more on config file
            yaml.dump(default_config, f)

    return reinit_flag

is_workspace_server_alive(self)

Check if ws server is alive.

Source code in crane/lib/sync/user/workspace.py
def is_workspace_server_alive(self) -> bool:
    """Check if ws server is alive."""
    res = self._session.get(f"/workspace{C.WsServer.URL.PING}", timeout=1)
    return res.status_code == 200

Last update: February 22, 2022