Skip to content

ExecutionClient

ExecutionClient

ExecutionClient(
    port: int,
    host: str = "localhost",
    heartbeat_interval: float = 10,
)

Context manager for executing code in an IPython kernel running in an ExecutionContainer. The kernel is created on entering the context and destroyed on exit. The container's /app directory is added to the kernel's Python path.

Code execution is stateful for a given ExecutionClient instance. Definitions and variables of previous executions are available to subsequent executions.

Parameters:

Name Type Description Default
port int

Host port for the container's executor port

required
host str

Hostname or IP address of the container's host

'localhost'
heartbeat_interval float

Ping interval for keeping the websocket connection to the IPython kernel alive.

10
Source code in ipybox/executor.py
def __init__(self, port: int, host: str = "localhost", heartbeat_interval: float = 10):
    self.port = port
    self.host = host

    self._heartbeat_interval = heartbeat_interval
    self._heartbeat_callback = None

    self._kernel_id = None
    self._ws: WebSocketClientConnection

kernel_id property

kernel_id

The ID of the running IPython kernel.

Raises:

Type Description
ValueError

If not connected to a kernel

connect async

connect(retries: int = 10, retry_interval: float = 1.0)

Creates an IPython kernel and connects to it.

Parameters:

Name Type Description Default
retries int

Number of connection retries.

10
retry_interval float

Delay between connection retries in seconds.

1.0

Raises:

Type Description
ConnectionError

If connection cannot be established after all retries

Source code in ipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):
    """Creates an IPython kernel and connects to it.

    Args:
        retries: Number of connection retries.
        retry_interval: Delay between connection retries in seconds.

    Raises:
        ConnectionError: If connection cannot be established after all retries
    """
    for _ in range(retries):
        try:
            self._kernel_id = await self._create_kernel()
            break
        except Exception:
            await asyncio.sleep(retry_interval)
    else:
        raise ConnectionError("Failed to create kernel")

    self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))
    logger.info("Connected to kernel")

    self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)
    self.heartbeat_callback.start()
    logger.info(f"Started heartbeat (interval = {self._heartbeat_interval}s)")

    await self._init_kernel()

disconnect async

disconnect()

Disconnects from and deletes the running IPython kernel.

Source code in ipybox/executor.py
async def disconnect(self):
    """Disconnects from and deletes the running IPython kernel."""
    self.heartbeat_callback.stop()
    self._ws.close()
    async with aiohttp.ClientSession() as session:
        async with session.delete(self.kernel_http_url):
            pass

execute async

execute(code: str, timeout: float = 120) -> ExecutionResult

Executes code in this client's IPython kernel and returns the result.

Parameters:

Name Type Description Default
code str

Code to execute

required
timeout float

Maximum time in seconds to wait for the execution result

120

Raises:

Type Description
ExecutionError

If code execution raises an error

TimeoutError

If code execution duration exceeds the specified timeout

Source code in ipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:
    """Executes code in this client's IPython kernel and returns the result.

    Args:
        code: Code to execute
        timeout: Maximum time in seconds to wait for the execution result

    Raises:
        ExecutionError: If code execution raises an error
        asyncio.TimeoutError: If code execution duration exceeds the specified timeout
    """
    execution = await self.submit(code)
    return await execution.result(timeout=timeout)

submit async

submit(code: str) -> Execution

Submits code for execution in this client's IPython kernel and returns an Execution object for consuming the execution result.

Parameters:

Name Type Description Default
code str

Python code to execute

required

Returns:

Type Description
Execution

A Execution object to track the code execution.

Source code in ipybox/executor.py
async def submit(self, code: str) -> Execution:
    """Submits code for execution in this client's IPython kernel and returns an
    [`Execution`][ipybox.executor.Execution] object for consuming the execution result.

    Args:
        code: Python code to execute

    Returns:
        A [`Execution`][ipybox.executor.Execution] object to track the code execution.
    """
    req_id = uuid4().hex
    req = {
        "header": {
            "username": "",
            "version": "5.0",
            "session": "",
            "msg_id": req_id,
            "msg_type": "execute_request",
        },
        "parent_header": {},
        "channel": "shell",
        "content": {
            "code": code,
            "silent": False,
            "store_history": False,
            "user_expressions": {},
            "allow_stdin": False,
        },
        "metadata": {},
        "buffers": {},
    }

    await self._send_request(req)
    return Execution(client=self, req_id=req_id)

ExecutionResult dataclass

ExecutionResult(text: str | None, images: list[Image])

The result of a successful code execution.

Parameters:

Name Type Description Default
text str | None

Output text generated during execution

required
images list[Image]

List of images generated during execution

required

Execution

Execution(client: ExecutionClient, req_id: str)

A code execution in an IPython kernel.

Parameters:

Name Type Description Default
client ExecutionClient

The client that initiated this code execution

required
req_id str

Unique identifier of the code execution request

required
Source code in ipybox/executor.py
def __init__(self, client: "ExecutionClient", req_id: str):
    self.client = client
    self.req_id = req_id

    self._chunks: list[str] = []
    self._images: list[Image.Image] = []

    self._stream_consumed: bool = False

result async

result(timeout: float = 120) -> ExecutionResult

Retrieves the complete result of this code execution. Waits until the result is available.

Parameters:

Name Type Description Default
timeout float

Maximum time in seconds to wait for the execution result

120

Raises:

Type Description
ExecutionError

If code execution raises an error

TimeoutError

If code execution duration exceeds the specified timeout

Source code in ipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:
    """Retrieves the complete result of this code execution. Waits until the
    result is available.

    Args:
        timeout: Maximum time in seconds to wait for the execution result

    Raises:
        ExecutionError: If code execution raises an error
        asyncio.TimeoutError: If code execution duration exceeds the specified timeout
    """
    if not self._stream_consumed:
        async for _ in self.stream(timeout=timeout):
            pass

    return ExecutionResult(
        text="".join(self._chunks).strip() if self._chunks else None,
        images=self._images,
    )

stream async

stream(timeout: float = 120) -> AsyncIterator[str]

Streams the code execution result as it is generated. Once the stream is consumed, a result is immediately available without waiting.

Generated images are not streamed. They can be obtained from the return value of result.

Parameters:

Name Type Description Default
timeout float

Maximum time in seconds to wait for the complete execution result

120

Raises:

Type Description
ExecutionError

If code execution raises an error

TimeoutError

If code execution duration exceeds the specified timeout

Source code in ipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:
    """Streams the code execution result as it is generated. Once the stream
    is consumed, a [`result`][ipybox.executor.Execution.result] is immediately
    available without waiting.

    Generated images are not streamed. They can be obtained from the
    return value of [`result`][ipybox.executor.Execution.result].

    Args:
        timeout: Maximum time in seconds to wait for the complete execution result

    Raises:
        ExecutionError: If code execution raises an error
        asyncio.TimeoutError: If code execution duration exceeds the specified timeout
    """
    try:
        async with asyncio.timeout(timeout):
            async for elem in self._stream():
                match elem:
                    case str():
                        self._chunks.append(elem)
                        yield elem
                    case Image.Image():
                        self._images.append(elem)
    except asyncio.TimeoutError:
        await self.client._interrupt_kernel()
        await asyncio.sleep(0.2)  # TODO: make configurable
        raise
    finally:
        self._stream_consumed = True

ExecutionError

ExecutionError(message: str, trace: str | None = None)

Bases: Exception

Raised when code execution in an IPython kernel raises an error.

Parameters:

Name Type Description Default
message str

Error message

required
trace str | None

String representation of the stack trace.

None
Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):
    super().__init__(message)
    self.trace = trace

ConnectionError

Bases: Exception

Raised when a connection to an IPython kernel cannot be established.