Skip to content

ExecutionClient

ExecutionClient

ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)

A context manager for executing code in an IPython kernel.

Parameters:

Name Type Description Default
host str

Hostname where the code execution container is running

'localhost'
port int

Host port of the code execution container

required
heartbeat_interval float

Interval in seconds between heartbeat pings. Defaults to 10.

10
Example
from ipybox import ExecutionClient, ExecutionContainer

binds = {"/host/path": "example/path"}
env = {"API_KEY": "secret"}

async with ExecutionContainer(binds=binds, env=env) as container:
    async with ExecutionClient(host="localhost", port=container.port) as client:
        result = await client.execute("print('Hello, world!')")
        print(result.text)

Hello, world!

Source code in ipybox/executor.py
def __init__(self, port: int, host: str = "localhost", heartbeat_interval: float = 10):
    self.port = port
    self.host = host

    self._heartbeat_interval = heartbeat_interval
    self._heartbeat_callback = None

    self._kernel_id = None
    self._ws: WebSocketClientConnection

kernel_id property

kernel_id

The ID of the running IPython kernel.

Raises:

Type Description
ValueError

If not connected to a kernel

connect async

connect(retries: int = 10, retry_interval: float = 1.0)

Creates and connects to an IPython kernel.

Parameters:

Name Type Description Default
retries int

Number of connection attempts. Defaults to 10.

10
retry_interval float

Delay between retries in seconds. Defaults to 1.0.

1.0

Raises:

Type Description
ConnectionError

If connection cannot be established after all retries

Source code in ipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):
    """Creates and connects to an IPython kernel.

    Args:
        retries: Number of connection attempts. Defaults to 10.
        retry_interval: Delay between retries in seconds. Defaults to 1.0.

    Raises:
        ConnectionError: If connection cannot be established after all retries
    """
    for _ in range(retries):
        try:
            self._kernel_id = await self._create_kernel()
            break
        except Exception:
            await asyncio.sleep(retry_interval)
    else:
        raise ConnectionError("Failed to create kernel")

    self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))
    logger.info("Connected to kernel")

    self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)
    self.heartbeat_callback.start()
    logger.info(f"Started heartbeat (interval = {self._heartbeat_interval}s)")

    await self._init_kernel()

disconnect async

disconnect()

Closes the connection to the kernel and cleans up resources.

Source code in ipybox/executor.py
async def disconnect(self):
    """Closes the connection to the kernel and cleans up resources."""
    self.heartbeat_callback.stop()
    self._ws.close()
    async with aiohttp.ClientSession() as session:
        async with session.delete(self.kernel_http_url):
            pass

execute async

execute(code: str, timeout: float = 120) -> ExecutionResult

Executes code and returns the result.

Parameters:

Name Type Description Default
code str

Code to execute

required
timeout float

Maximum execution time in seconds. Defaults to 120.

120

Returns:

Type Description
ExecutionResult

ExecutionResult object

Raises:

Type Description
ExecutionError

If code execution raised an error

TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:
    """Executes code and returns the result.

    Args:
        code: Code to execute
        timeout: Maximum execution time in seconds. Defaults to 120.

    Returns:
        ExecutionResult object

    Raises:
        ExecutionError: If code execution raised an error
        asyncio.TimeoutError: If execution exceeds timeout duration
    """
    execution = await self.submit(code)
    return await execution.result(timeout=timeout)

submit async

submit(code: str) -> Execution

Submits code for execution and returns an Execution object to track it.

Parameters:

Name Type Description Default
code str

Python code to execute

required

Returns:

Type Description
Execution

An Execution object to track the submitted code execution

Source code in ipybox/executor.py
async def submit(self, code: str) -> Execution:
    """Submits code for execution and returns an Execution object to track it.

    Args:
        code: Python code to execute

    Returns:
        An Execution object to track the submitted code execution
    """
    req_id = uuid4().hex
    req = {
        "header": {
            "username": "",
            "version": "5.0",
            "session": "",
            "msg_id": req_id,
            "msg_type": "execute_request",
        },
        "parent_header": {},
        "channel": "shell",
        "content": {
            "code": code,
            "silent": False,
            "store_history": False,
            "user_expressions": {},
            "allow_stdin": False,
        },
        "metadata": {},
        "buffers": {},
    }

    await self._send_request(req)
    return Execution(client=self, req_id=req_id)

ExecutionResult dataclass

ExecutionResult(text: str | None, images: list[Image])

The result of a code execution.

Parameters:

Name Type Description Default
text str | None

Output text generated during execution

required
images list[Image]

List of images generated during execution

required

Execution

Execution(client: ExecutionClient, req_id: str)

Represents a code execution in an IPython kernel.

Parameters:

Name Type Description Default
client ExecutionClient

The client instance that created this execution

required
req_id str

Unique identifier for the execution request

required
Source code in ipybox/executor.py
def __init__(self, client: "ExecutionClient", req_id: str):
    self.client = client
    self.req_id = req_id

    self._chunks: list[str] = []
    self._images: list[Image.Image] = []

    self._stream_consumed: bool = False

result async

result(timeout: float = 120) -> ExecutionResult

Waits for execution to complete and returns the final result.

If a timeout is reached, the kernel is interrupted.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds. Defaults to 120.

120

Returns:

Type Description
ExecutionResult

ExecutionResult object

Raises:

Type Description
TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:
    """Waits for execution to complete and returns the final result.

    If a timeout is reached, the kernel is interrupted.

    Args:
        timeout: Maximum time to wait in seconds. Defaults to 120.

    Returns:
        ExecutionResult object

    Raises:
        asyncio.TimeoutError: If execution exceeds timeout duration
    """
    if not self._stream_consumed:
        async for _ in self.stream(timeout=timeout):
            pass

    return ExecutionResult(
        text="".join(self._chunks).strip() if self._chunks else None,
        images=self._images,
    )

stream async

stream(timeout: float = 120) -> AsyncIterator[str]

Streams the execution output text as it becomes available.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds. Defaults to 120.

120

Yields:

Type Description
AsyncIterator[str]

Output text chunks as they arrive

Raises:

Type Description
TimeoutError

If execution exceeds timeout duration

Source code in ipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:
    """Streams the execution output text as it becomes available.

    Args:
        timeout: Maximum time to wait in seconds. Defaults to 120.

    Yields:
        Output text chunks as they arrive

    Raises:
        asyncio.TimeoutError: If execution exceeds timeout duration
    """
    try:
        async with asyncio.timeout(timeout):
            async for elem in self._stream():
                match elem:
                    case str():
                        self._chunks.append(elem)
                        yield elem
                    case Image.Image():
                        self._images.append(elem)
    except asyncio.TimeoutError:
        await self.client._interrupt_kernel()
        await asyncio.sleep(0.2)  # TODO: make configurable
        raise
    finally:
        self._stream_consumed = True

ExecutionError

ExecutionError(message: str, trace: str | None = None)

Bases: Exception

Exception raised when code execution in the IPython kernel fails.

Parameters:

Name Type Description Default
message str

Error message

required
trace str | None

Stack trace string representation

None
Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):
    super().__init__(message)
    self.trace = trace

ConnectionError

Bases: Exception

Exception raised when connection to an IPython kernel fails.