# Group Genie > Multi-party conversation intelligence for AI agents Group Genie enables single-user AI agents to participate in group chat conversations without requiring modification to the agents themselves. It combines Group Sense's intelligent pattern detection with a flexible agent integration layer. Agents can be based on any technology stack and integrated through a simple interface, with default implementations provided for Pydantic AI and the OpenAI Agents SDK. # User Guide # Group Genie ## Introduction Group Genie enables single-user AI agents to participate in group chat conversations without requiring modification to the agents themselves. While many AI agents excel at responding to direct queries from individual users, they typically cannot handle multi-party conversations where relevant information emerges from complex exchanges between multiple participants. Group Genie solves this by combining [Group Sense](https://gradion-ai.github.io/group-sense/)'s intelligent pattern detection with a flexible agent integration layer. Agents can be based on any technology stack (framework, API, etc.) and integrated through a simple agent interface, with default implementations provided for Pydantic AI and the OpenAI Agents SDK. ## Key Features - **Group conversation understanding**: Built on [Group Sense](https://gradion-ai.github.io/group-sense/), which monitors group chats, detects conversation patterns, and reformulates multi-party exchanges into self-contained queries that AI agents can process. - **Dynamic response routing**: Group Sense reasoners determine recipients of agent responses based on conversation context and semantics, enabling agents to respond to appropriate group members. - **Agent framework support**: Pre-built Agent implementations for Pydantic AI and the OpenAI Agents SDK, allowing you to use existing agents from these frameworks in group chats without modification. - **Agent hierarchies**: Organize agents into coordinator-subagent hierarchies of any depth, each having their own context window for focused conversations and task-specific expertise. - **Acts on behalf of users**: Agents can act on behalf of individual group members using their credentials, enabling secure access to a user's private resources while maintaining proper access boundaries to other users. - **Agent lifecycle management**: Configurable idle timeouts optimize resource usage by automatically stopping idle agents and freeing their memory and MCP server connections. - **Session persistence**: File-based persistence for group chat messages and agent states allows group sessions to be suspended and resumed. - **Unified tool approval mechanism**: Consistent approval workflow for tool calls across agent hierarchies, with support for manual and automatic approval modes. - **Rich message support**: Handles message attachments with automatic propagation through agent chains. Thread references provide context from related group chats. ## Next steps 1. [Install](installation/) the library and configure API keys 1. Follow the [tutorial](tutorial/) to build your first group chat agent 1. Learn how to [integrate](integration/) Group Genie into your application ## LLM-optimized documentation - [llms.txt](/group-genie/llms.txt) - [llms-full.txt](/group-genie/llms-full.txt) # Installation ## Python Package ```bash pip install group-genie ``` ## Development Setup For setting up a development environment, see [DEVELOPMENT.md](https://github.com/gradion-ai/group-genie/blob/main/DEVELOPMENT.md). # Tutorial This tutorial walks you through a complete example that demonstrates how Group Genie enables AI agents to participate in group chat conversations. You'll learn how to set up a group session, configure a group reasoner to detect conversation patterns, and connect an agent that responds to queries generated by the group reasoner. ## About Group Genie While many AI agents excel at responding to direct queries from individual users, they typically cannot handle multi-party conversations where relevant information emerges from complex exchanges between multiple participants. Group Genie combines [Group Sense](https://gradion-ai.github.io/group-sense/)'s intelligent pattern detection with a flexible agent integration layer, allowing existing single-user agents to participate naturally in group chats without requiring any modification to the agents themselves. ## Example Scenario In this example, we'll create a fact-checking assistant that monitors a group chat for factual inconsistencies. When the group reasoner detects contradictory statements, it sends a self-contained query, reformulated from the conversation context, to the system agent to verify facts through web search and respond to the group. Consider this group chat exchange: - `user1`: "I'm going to Vienna tomorrow" - `user2`: "Enjoy your time there!" - `user3`: "Cool, plan a visit to the Hofbräuhaus!" The third message contains a factual inconsistency (Hofbräuhaus is in Munich, not Vienna). The group reasoner will detect this and delegate to the system agent, which will search the web, identify the mistake, and respond with a clarification. ## Core Components The tutorial demonstrates four essential components: ### Group Session GroupSession is the main entry point that orchestrates message flow through group reasoners and agents. It provides concurrent processing for different users while ensuring messages from the same sender are processed sequentially. It also manages the lifecycle of both reasoners and agents. ### Group Reasoner A GroupReasoner analyzes group chat messages and decides whether to ignore them or generate queries for agents. In this example, we use DefaultGroupReasoner configured with a [fact-checking](https://github.com/gradion-ai/group-genie/blob/main/examples/prompts/reasoner/fact_check.md) prompt that detects contradictory statements between messages. ### System Agent The system agent is the primary target for processing delegated queries generated by the group reasoner. It can be any Agent; the name *system agent* reflects that it can serve as a facade to a larger system of agents (e.g. acting as a coordinator that delegates to subagents), but it can also be a standalone agent that directly handles queries. In this example, we use a standalone DefaultAgent with access to web search through an MCP server, allowing it to verify facts by searching online. ### Factories Each group chat participant owns both a group reasoner instance and a system agent instance, allowing independent reasoning state and credentials. These are created by: - GroupReasonerFactory creates group reasoners with user-specific system prompts and reasoning states - AgentFactory creates system agent instances with user-specific credentials, enabling agents to act on behalf of individual group members ## Implementation Walkthrough ### Secrets Provider We implement SecretsProvider, an interface designed to retrieve user-specific credentials: examples/factory/secrets.py ```python import os from group_genie.secrets import SecretsProvider class EnvironmentSecretsProvider(SecretsProvider): def get_secrets(self, username: str) -> dict[str, str] | None: # For development: use environment variables for all users var_names = ["OPENAI_API_KEY", "GOOGLE_API_KEY", "BRAVE_API_KEY"] return {var_name: os.getenv(var_name, "") for var_name in var_names} ``` In this development example, we just return the same set of environment variables for all users. In production, you would implement per-user credential storage and retrieval. ### Group Reasoner Factory We use the `get_group_reasoner_factory` helper to obtain a GroupReasonerFactory for creating user-specific reasoner instances: examples/factory/pydantic_ai/reasoner_factory.py ```python from functools import partial from pydantic_ai.models.google import GoogleModel from pydantic_ai.providers.google import GoogleProvider from examples.utils import load_reasoner_template from group_genie.agent.provider.pydantic_ai import DefaultGroupReasoner from group_genie.reasoner import GroupReasoner, GroupReasonerFactory from group_genie.secrets import SecretsProvider def create_group_reasoner( system_template: str, secrets: dict[str, str], owner: str, ) -> GroupReasoner: model = GoogleModel( "gemini-2.5-flash", provider=GoogleProvider(api_key=secrets.get("GOOGLE_API_KEY", "")), ) return DefaultGroupReasoner( system_prompt=system_template.format(owner=owner), model=model, ) def get_group_reasoner_factory( secrets_provider: SecretsProvider | None = None, template_name: str = "general_assist", ): system_template = load_reasoner_template(template_name) return GroupReasonerFactory( group_reasoner_factory_fn=partial(create_group_reasoner, system_template), secrets_provider=secrets_provider, ) ``` The `create_group_reasoner` function receives a system prompt template, secrets, and the owner's username and returns a configured GroupReasoner. It: 1. Creates a Gemini model instance with the owner's Google API key from their secrets 1. Formats the system prompt template with the owner's username 1. Returns a DefaultGroupReasoner configured with the formatted system prompt and model ### Agent Factory Agent frameworks Group Genie supports multiple agent frameworks through the Agent interface. The following example factory is defined in [`pydantic_ai/agent_factory_1.py`](https://github.com/gradion-ai/group-genie/blob/main/examples/factory/pydantic_ai/agent_factory_1.py). It uses [Pydantic AI](https://ai.pydantic.dev/) through a default implementation of the Agent interface. An equivalent example using the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/) with another default implementation of the Agent interface is defined in [`openai/agent_factory_1.py`](https://github.com/gradion-ai/group-genie/blob/main/examples/factory/openai/agent_factory_1.py). You can also integrate any other agent framework or API by implementing the Agent interface directly. We use the `get_agent_factory` helper to obtain an AgentFactory for creating user-specific system agent instances: examples/factory/pydantic_ai/agent_factory_1.py ```python from pydantic_ai.mcp import MCPServerStdio from pydantic_ai.models.google import GoogleModel, GoogleModelSettings from pydantic_ai.providers.google import GoogleProvider from group_genie.agent import Agent, AgentFactory from group_genie.agent.provider.pydantic_ai import DefaultAgent from group_genie.secrets import SecretsProvider def create_system_agent(secrets: dict[str, str]) -> Agent: brave_mcp_server = MCPServerStdio( command="npx", args=["-y", "@modelcontextprotocol/server-brave-search"], env={ "BRAVE_API_KEY": secrets.get("BRAVE_API_KEY", ""), }, ) model = GoogleModel( "gemini-2.5-flash", provider=GoogleProvider(api_key=secrets.get("GOOGLE_API_KEY", "")), ) return DefaultAgent( system_prompt=( "You are a helpful assistant. " "Always search the web for checking facts. " "Provide short, concise answers." ), model=model, model_settings=GoogleModelSettings( google_thinking_config={ "thinking_budget": 0, } ), toolsets=[brave_mcp_server], ) def get_agent_factory(secrets_provider: SecretsProvider | None = None): return AgentFactory( system_agent_factory=create_system_agent, secrets_provider=secrets_provider, ) ``` The `create_system_agent` function receives the owner's secrets and returns a configured Agent. It: 1. Configures a Brave Search MCP server with the owner's Brave API key 1. Configures a Gemini model instance with the owner's Google API key 1. Returns a DefaultAgent configured with: - A system prompt instructing it to search the web for fact-checking - The configured Gemini model - The configured Brave Search MCP server ### Group Session Now we bring everything together by creating a GroupSession: examples/guide/tutorial.py ```python import asyncio import logging from pathlib import Path from uuid import uuid4 from examples.factory.pydantic_ai.agent_factory_1 import get_agent_factory from examples.factory.pydantic_ai.reasoner_factory import get_group_reasoner_factory from examples.factory.secrets import EnvironmentSecretsProvider from group_genie.agent import Approval, Decision from group_genie.datastore import DataStore from group_genie.message import Message from group_genie.session import Execution, GroupSession async def complete_execution(execution: Execution) -> None: async for elem in execution.stream(): match elem: case Decision(): # log group reasoner decision logger.debug(elem) case Approval(): # log tool call approval request logger.debug(elem) # approve tool call elem.approve() case Message(): # log agent response logger.debug(elem) secrets_provider = EnvironmentSecretsProvider() session_id = uuid4().hex[:8] session = GroupSession( id=session_id, group_reasoner_factory=get_group_reasoner_factory( secrets_provider=secrets_provider, template_name="fact_check", ), agent_factory=get_agent_factory(secrets_provider=secrets_provider), data_store=DataStore(root_path=Path(".data", "tutorial")), ) chat = [ # example group chat messages # no factual inconsistency, group reasoner will ignore the message. Message(content="I'm going to Vienna tomorrow", sender="user1"), # no factual inconsistency, group reasoner will ignore the message. Message(content="Enjoy your time there!", sender="user2"), # factual inconsistency in response to user1's message. # Group reasoner will delegate to system agent for fact checking. Message(content="Cool, plan a visit to the Hofbräuhaus!", sender="user3"), ] # Add chat messages to session and create execution objects executions = [session.handle(msg) for msg in chat] # Concurrently process group chat messages. The complete_execution() # helper logs reasoner decisions and agent responses to the console. coros = [complete_execution(exec) for exec in executions] await asyncio.gather(*coros) ``` This code: 1. Creates a [secrets provider](#secrets-provider) to supply API keys 1. Generates a unique session ID for this group chat 1. Initializes a GroupSession with: - A group reasoner factory configured for fact-checking - An agent factory that creates system agents with web search capabilities - A DataStore for persisting messages, reasoner and agent states 1. Defines a sample group chat with three messages, where the third contains a factual inconsistency 1. Handles each message by calling handle(), which returns an Execution object 1. Concurrently consumes execution streams with the `complete_execution` helper and `asyncio.gather()` An execution stream yields three types of elements: - `Decision`: The group reasoner's decision about whether to ignore or delegate the message - Approval: Requests for approval of tool calls (e.g., web searches) - Message: The agent's response to be sent to the group In this example, we log all events and automatically approve all tool calls. In a production application, you might implement selective approval logic or user confirmation for sensitive operations. ## Running the Example Development Setup To set up the environment for running the example, see [Development Setup](../installation/#development-setup). To run this example: 1. Set up your API keys: ```bash export GOOGLE_API_KEY="your-gemini-api-key" export BRAVE_API_KEY="your-brave-search-api-key" ``` 1. Run the tutorial script: ```bash python examples/guide/tutorial.py ``` The output will show the group reasoner detecting the factual contradiction and the agent searching the web to verify that Hofbräuhaus is actually in Munich, not Vienna. The agent will then generate a response clarifying this mistake for the group. The output should look like this: ```text 2025-11-05 11:06:15,947 DEBUG __main__: Decision.IGNORE 2025-11-05 11:06:17,085 DEBUG __main__: Decision.IGNORE 2025-11-05 11:06:19,526 DEBUG __main__: Decision.DELEGATE 2025-11-05 11:06:20,336 DEBUG __main__: [sender="system"] brave_web_search(query='Hofbräuhaus location') 2025-11-05 11:06:22,399 DEBUG __main__: Message(content='The Hofbräuhaus is a famous beer hall located in Munich, Germany, not Vienna, Austria. Therefore, a visit to the Hofbräuhaus would not be possible if you are going to Vienna.', sender='system', receiver=None, threads=[], attachments=[], request_id=None) ``` # Chat Server Integration This guide demonstrates how to integrate Group Genie into a chat server to enable multi-party reasoning and agent responses in group chat environments. We'll use the [group-terminal](https://github.com/gradion-ai/group-terminal) chat server as a reference implementation, but the patterns apply to any chat server architecture. Note This integration guide assumes you are familiar with the Group Genie [tutorial](../tutorial/). ## Overview Integrating Group Genie into a chat server involves connecting two key components: 1. **Chat Server**: Receives messages from users and broadcasts responses back to connected clients 1. **Group Session**: Processes messages through group reasoning and agent execution and publishes responses to the group. ## Key Integration Points A chat server usually provides two integration points. The following are specific to the [group-terminal](https://gradion-ai.github.io/group-terminal/) chat server: 1. **Message Handler**: A callback that receives incoming messages from chat clients - Signature: `async def handler(content: str, sender: str)` - Called sequentially in message arrival order - Must be non-blocking to avoid delaying subsequent messages 1. **Send Message**: A method to broadcast responses back to chat clients - Signature: `async def send_message(content: str, sender: str)` - Called from message processing tasks to deliver agent responses ## Implementation Pattern ### Message Ingestion ```python from asyncio import create_task from group_terminal.server import ChatServer from group_genie.session import GroupSession from group_genie.message import Message class App: def __init__( self, group_reasoner_factory, agent_factory, session_id, host="0.0.0.0", port=8723, ): # Initialize Group Session self._session = GroupSession( id=session_id, group_reasoner_factory=group_reasoner_factory, agent_factory=agent_factory, data_store=data_store, ) # Initialize chat server and register message handler self._server = ChatServer(host=host, port=port) self._server.add_handler(self._handle_message) async def _handle_message(self, content: str, sender: str): # Create Group Genie message from chat message message = Message(content=content, sender=sender) # Add message to session in arrival order # handle() must be called sequentially # to maintain consistent message ordering execution = self._session.handle(message) # Process messages asynchronously to avoid blocking # the message handler from receiving subsequent messages create_task(self._complete_execution(execution)) ``` Message ingestion includes: - `session.handle(message)` is called synchronously in the message handler, ensuring messages are added to the session in arrival order - `create_task()` runs the message processing coroutine asynchronously, allowing the message handler to return immediately and receive the next message - This design maintains ordering guarantees while preventing long-running agent executions from blocking the message queue ## Message Processing The `_complete_execution()` message processing coroutine processes three types of elements from an Execution stream: ```python from group_genie.session import Execution from group_genie.agent import Approval, Decision from group_genie.message import Message async def _complete_execution(self, execution: Execution): async for elem in execution.stream(): match elem: case Decision() as decision: # Group reasoner decided to IGNORE or DELEGATE logger.debug(f"Reasoner decision: {decision.value}") case Approval() as approval: # Agent requests tool call approval logger.debug(f"Auto-approve {approval}") elem.approve() case Message() as message: # Agent generated a response message logger.debug(f"Agent response: {message.content}") await self._server.send_message( message.content, sender=message.sender ) ``` - **Decision**: Logs the group reasoner's decision (IGNORE or DELEGATE). If IGNORE, the stream completes with no further elements. - **Approval**: Handles tool call approval requests. In this example, all tool calls are auto-approved. Production systems might implement manual approval workflows. - **Message**: Send agent responses back to chat clients via the server's `send_message` method. ## Complete Example The complete example is available at [examples/guide/chat.py](https://github.com/gradion-ai/group-genie/blob/main/examples/guide/chat.py). ### Running the Example Start the chat server with the [fact-checking](https://github.com/gradion-ai/group-genie/blob/main/examples/prompts/reasoner/fact_check.md) template: ```bash python examples/guide/chat.py --template-name fact_check ``` In separate terminals, launch three clients: ```bash python -m group_terminal.client --username user1 python -m group_terminal.client --username user2 python -m group_terminal.client --username user3 ``` ### Example Conversation The screenshots below show the fact-checking scenario from the tutorial, but running in a group chat environment. The same three users (user1, user2, user3) participate in the conversation: `user1`'s view: `user2`'s view: `user3`'s view: The group reasoner detects the factual inconsistency (Hofbräuhaus is in Munich, not Vienna) and delegates to the system agent to resolve it. # API Reference ## group_genie.session.GroupSession ```python GroupSession(id: str, group_reasoner_factory: GroupReasonerFactory, agent_factory: AgentFactory, data_store: DataStore | None = None, preferences_source: PreferencesSource | None = None) ``` Main entry point for managing group chat sessions with AI agents. GroupSession orchestrates the flow of messages through group reasoners and agents, managing their lifecycle and state persistence. It maintains message ordering, handles concurrent processing for different users, and provides graceful shutdown. Messages are stored internally in the order of handle() calls and processed concurrently for different senders. Messages from the same sender are always processed sequentially. Persisted session state (messages and agent/reasoner state) is automatically loaded during initialization if a DataStore is provided. Example ```python session = GroupSession( id="session123", group_reasoner_factory=create_group_reasoner_factory(), agent_factory=create_agent_factory(), data_store=DataStore(root_path=Path(".data/sessions/session123")), ) # Handle incoming message execution = session.handle( Message(content="What's the weather in Vienna?", sender="alice") ) # Process execution async for elem in execution.stream(): match elem: case Decision.DELEGATE: print("Query delegated to agent") case Approval() as approval: approval.approve() case Message() as response: print(f"Response: {response.content}") # Gracefully stop session session.stop() await session.join() ``` Initialize a new group chat session. Parameters: | Name | Type | Description | Default | | ------------------------ | ---------------------- | ---------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `id` | `str` | Unique identifier for this session. Used as the root key for persisted state in the DataStore. | *required* | | `group_reasoner_factory` | `GroupReasonerFactory` | Factory for creating group reasoner instances that decide when to delegate messages to agents. | *required* | | `agent_factory` | `AgentFactory` | Factory for creating agent instances that process delegated queries. | *required* | | `data_store` | \`DataStore | None\` | Optional persistent storage for session messages and agent state. If provided, session state is automatically loaded on initialization and saved after each message. Experimental feature not suitable for production. | | `preferences_source` | \`PreferencesSource | None\` | Optional source for user-specific preferences that are included in agent prompts. | ### get_group_chat_messages ```python get_group_chat_messages() -> str ``` Returns the group chat messages as a JSON string. ### handle ```python handle(message: Message) -> Execution ``` Process an incoming group chat message. Adds the message to the session's message history and initiates processing through group reasoners and agents. Returns immediately with an Execution object that can be used to retrieve results. Messages are stored in the order handle() is called. For different senders, messages are processed concurrently. For the same sender, messages are processed sequentially to maintain conversation coherence. Parameters: | Name | Type | Description | Default | | --------- | --------- | ----------------------- | ---------- | | `message` | `Message` | The message to process. | *required* | Returns: | Type | Description | | ----------- | ----------------------------------------------------------------------------------- | | `Execution` | An Execution object that provides access to the processing stream and final result. | ### join ```python join() ``` Wait for the session to complete shutdown. Blocks until all internal workers, agents, and reasoners have stopped. Must be called after stop() to ensure proper cleanup. ### load_messages ```python load_messages(data_store: DataStore) -> list[Message] | None ``` Load persisted messages from a data store. Utility method for accessing session messages without creating a full GroupSession instance. Automatically called during session initialization. Parameters: | Name | Type | Description | Default | | ------------ | ----------- | ---------------------------------------------- | ---------- | | `data_store` | `DataStore` | DataStore containing the session data to load. | *required* | Returns: | Type | Description | | --------------- | ----------- | | \`list[Message] | None\` | ### request_ids ```python request_ids() -> Future[set[str]] ``` Retrieve all request IDs from messages in this session. Returns: | Type | Description | | ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `Future[set[str]]` | A Future that resolves to a set of request IDs from all messages that have been processed by this session. Only includes messages with non-None request_id values. | ### stop ```python stop() ``` Request graceful shutdown of the session. Allows currently processing messages to complete before stopping all group reasoners and agents. Call join() after stop() to wait for shutdown completion. ## group_genie.session.Execution ```python Execution(preferences_source: PreferencesSource | None = None) ``` Represents the asynchronous processing of a message through the system. Execution provides access to the stream of events (decision, approvals, and responses) generated while processing a message. It allows applications to monitor progress, handle approval requests, and retrieve the final result. The execution stream follows a guaranteed order: 1. One Decision (IGNORE or DELEGATE) 1. Zero or more Approval requests (only if DELEGATE and tools/subagents are called) 1. One Message (only if DELEGATE, containing the agent's response) Multiple calls to stream() are safe and will return the cached result after the first complete iteration. Example ```python execution = session.handle(message) # Stream events async for elem in execution.stream(): match elem: case Decision.IGNORE: print("Message ignored by reasoner") case Decision.DELEGATE: print("Message delegated to agent") case Approval() as approval: print(f"Tool call requires approval: {approval}") approval.approve() case Message() as response: print(f"Agent response: {response.content}") # Or get result directly (auto-approves all tool calls) result = await execution.result() if result: print(f"Response: {result.content}") ``` ### result ```python result() -> Message | None ``` Retrieve the final message result, automatically approving all tool calls. Convenience method that streams through all events, auto-approving any Approval requests, and returns the final Message. Useful when manual approval handling is not needed. Returns: | Type | Description | | --------- | ----------- | | \`Message | None\` | ### stream ```python stream() -> AsyncIterator[Decision | Approval | Message] ``` Stream execution events as they occur. Yields events in guaranteed order: 1. One Decision (IGNORE or DELEGATE) 1. Zero or more Approval requests (if DELEGATE and tools are called) 1. One Message (if DELEGATE, containing the final response) Agent execution blocks on Approval requests until they are approved or denied. Applications must handle all emitted Approvals by calling approve() or deny(). If auto_approve is enabled in the ApprovalContext, Approval events are not emitted and all tool calls are automatically approved. Can be called multiple times. After the first complete iteration, cached results are returned immediately. Yields: | Type | Description | | ------------------------- | ----------- | | \`AsyncIterator\[Decision | Approval | | \`AsyncIterator\[Decision | Approval | | \`AsyncIterator\[Decision | Approval | | \`AsyncIterator\[Decision | Approval | ## group_genie.preferences.PreferencesSource Bases: `ABC` Abstract base class for providing user-specific preferences. PreferencesSource supplies user preferences that customize agent behavior and response style. Preferences are typically free-form text (often bullet points) describing formatting, tone, verbosity, and other stylistic choices. Preferences are included in agent prompts to personalize responses without modifying agent system prompts. Example ```python class DatabasePreferencesSource(PreferencesSource): async def get_preferences(self, username: str) -> str | None: user = await database.get_user(username) if not user or not user.preferences: return None return user.preferences # Example preferences: # "- Prefer concise responses # - Use bullet points for lists # - Include code examples when relevant # - Avoid technical jargon" class StaticPreferencesSource(PreferencesSource): def __init__(self, preferences_map: dict[str, str]): self._preferences = preferences_map async def get_preferences(self, username: str) -> str | None: return self._preferences.get(username) ``` ### get_preferences ```python get_preferences(username: str) -> str | None ``` Retrieve preferences for a specific user. Parameters: | Name | Type | Description | Default | | ---------- | ----- | --------------------------------- | ---------- | | `username` | `str` | User ID to fetch preferences for. | *required* | Returns: | Type | Description | | ----- | ----------- | | \`str | None\` | ## group_genie.message.Message ```python Message(content: str, sender: str, receiver: str | None = None, threads: list[Thread] = list(), attachments: list[Attachment] = list(), request_id: str | None = None) ``` Represents a message in a group chat conversation. Messages are the primary unit of communication in Group Genie. Messages can include attachments, reference other threads, and optionally specify receivers and correlation IDs. Attributes: | Name | Type | Description | | ------------- | ------------------ | ----------------------------------------------------------------------------------------- | | `content` | `str` | The text content of the message. | | `sender` | `str` | User ID of the message sender. Use "system" for agent-generated messages. | | `receiver` | \`str | None\` | | `threads` | `list[Thread]` | List of referenced threads from other group chats, providing cross- conversation context. | | `attachments` | `list[Attachment]` | List of files attached to this message. | | `request_id` | \`str | None\` | Example ```python # Simple message message = Message(content="Hello", sender="alice") # Message with attachment and receiver message = Message( content="Please review this document", sender="alice", receiver="bob", attachments=[Attachment( path="/tmp/doc.pdf", name="Document", media_type="application/pdf" )], request_id="req123" ) # Process message execution = session.handle(message) response = await execution.result() # Response will have same request_id assert response.request_id == "req123" ``` ### deserialize ```python deserialize(message_dict: dict[str, Any]) -> Message ``` Reconstruct a Message from a dictionary. Parameters: | Name | Type | Description | Default | | -------------- | ---------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | ---------- | | `message_dict` | `dict[str, Any]` | Dictionary containing message data with nested Thread and Attachment dictionaries, typically obtained from calling asdict() on a Message instance. | *required* | Returns: | Type | Description | | --------- | ----------------------------------------------------------------- | | `Message` | A Message instance with all nested objects properly deserialized. | ## group_genie.message.Attachment ```python Attachment(path: str, name: str, media_type: str) ``` Metadata for files attached to group chat messages. Attachments represent files (images, documents, etc.) that accompany messages. They reference local filesystem paths and provide metadata for agents to understand and process the files. The file at the specified path must exist when bytes is called, otherwise an error is raised and the agent run fails. Attributes: | Name | Type | Description | | ------------ | ----- | ------------------------------------------------------------------- | | `path` | `str` | Local filesystem path to the attachment file. | | `name` | `str` | Display name of the attachment. | | `media_type` | `str` | MIME type of the attachment (e.g., 'image/png', 'application/pdf'). | Example ```python attachment = Attachment( path="/tmp/report.pdf", name="Monthly Report", media_type="application/pdf" ) message = Message( content="Please review this report", sender="alice", attachments=[attachment] ) ``` ### media_type ```python media_type: str ``` MIME type of the attachment. ### name ```python name: str ``` Name of the attachment. ### path ```python path: str ``` Local file path to the attachment. ### bytes ```python bytes() -> bytes ``` Read the attachment file contents. Returns: | Type | Description | | ------- | ------------------------------------- | | `bytes` | The raw bytes of the attachment file. | Raises: | Type | Description | | ------------------- | ----------------------------------- | | `FileNotFoundError` | If the file at path does not exist. | ### deserialize ```python deserialize(attachment_dict: dict[str, Any]) -> Attachment ``` Reconstruct an Attachment from a dictionary. Parameters: | Name | Type | Description | Default | | ----------------- | ---------------- | ---------------------------------------------------------------------------------------------------------- | ---------- | | `attachment_dict` | `dict[str, Any]` | Dictionary containing attachment data, typically obtained from calling asdict() on an Attachment instance. | *required* | Returns: | Type | Description | | ------------ | ----------------------- | | `Attachment` | An Attachment instance. | ## group_genie.message.Thread ```python Thread(id: str, messages: list[Message]) ``` Reference to a conversation thread from another group chat. Threads allow messages to include context from other group conversations, enabling agents to access related discussions. Thread IDs are application-managed and typically correspond to GroupSession IDs. Applications are responsible for loading thread messages from the referenced group session and including them in the Thread object. Attributes: | Name | Type | Description | | ---------- | --------------- | ------------------------------------------------------------------------- | | `id` | `str` | Unique identifier of the referenced thread (typically a GroupSession ID). | | `messages` | `list[Message]` | List of messages from the referenced thread. | Example ```python # Load messages from another session other_session_messages = await GroupSession.load_messages(other_datastore) # Include as thread reference thread = Thread(id="session123", messages=other_session_messages) message = Message( content="Following up on the previous discussion", sender="alice", threads=[thread] ) ``` ### deserialize ```python deserialize(thread_dict: dict[str, Any]) -> Thread ``` Reconstruct a Thread from a dictionary. Parameters: | Name | Type | Description | Default | | ------------- | ---------------- | ------------------------------------------------------------------------------------------------------------------------------- | ---------- | | `thread_dict` | `dict[str, Any]` | Dictionary containing thread data with 'id' and 'messages' keys, typically obtained from calling asdict() on a Thread instance. | *required* | Returns: | Type | Description | | -------- | ------------------ | | `Thread` | A Thread instance. | ## group_genie.secrets.SecretsProvider Bases: `ABC` Abstract base class for providing user-specific secrets. SecretsProvider supplies credentials (like API keys) to agents and reasoners on a per-user basis. This enables agents to act on behalf of individual users with their own credentials while preventing unauthorized access to other users' resources. Implementations should return secrets as key-value pairs where keys are credential names (e.g., "GOOGLE_API_KEY") and values are the actual credentials. Example ```python class EnvironmentSecretsProvider(SecretsProvider): def get_secrets(self, username: str) -> dict[str, str] | None: # For development: use environment variables for all users return { "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), } class DatabaseSecretsProvider(SecretsProvider): def get_secrets(self, username: str) -> dict[str, str] | None: # For production: fetch user-specific credentials from database user = database.get_user(username) if not user: return None return { "GOOGLE_API_KEY": user.google_api_key, "OPENAI_API_KEY": user.openai_api_key, } ``` ### get_secrets ```python get_secrets(username: str) -> dict[str, str] | None ``` Retrieve secrets for a specific user. Parameters: | Name | Type | Description | Default | | ---------- | ----- | ----------------------------- | ---------- | | `username` | `str` | User ID to fetch secrets for. | *required* | Returns: | Type | Description | | ---------------- | ----------- | | \`dict[str, str] | None\` | ## group_genie.datastore.DataStore ```python DataStore(root_path: Path) ``` Persistent storage for session messages and agent state. DataStore provides a simple file-based persistence mechanism for Group Genie sessions. It stores data in JSON files organized in a hierarchical directory structure based on session IDs, owner IDs, and component keys. Key characteristics: - Automatic JSON serialization - Hierarchical key-based organization via narrow() - Asynchronous save operations (non-blocking) - Key sanitization for filesystem safety - No depth limits on hierarchy Note This is an experimental snapshot store for development and testing. Do not use in production. Example ```python # Create data store for a session store = DataStore(root_path=Path(".data/sessions/session123")) # Save data await store.save("messages", {"messages": [...]}) # Load data data = await store.load("messages") # Create narrowed store for a component async with store.narrow("alice") as alice_store: await alice_store.save("agent", agent_state) # Path structure: .data/sessions/session123/alice/agent.json ``` Initialize a data store with a root directory. Parameters: | Name | Type | Description | Default | | ----------- | ------ | ------------------------------------------ | ---------- | | `root_path` | `Path` | Root directory for storing all data files. | *required* | ### load ```python load(key: str) -> Data ``` Load data from storage. Parameters: | Name | Type | Description | Default | | ----- | ----- | ----------------------------------------- | ---------- | | `key` | `str` | Storage key identifying the data to load. | *required* | Returns: | Type | Description | | ------ | ----------------------------------------- | | `Data` | The loaded data (deserialized from JSON). | Raises: | Type | Description | | ---------- | ------------------------------------- | | `KeyError` | If the key does not exist in storage. | ### narrow ```python narrow(key: str) -> AsyncIterator[DataStore] ``` Create a narrowed data store scoped to a subdirectory. Useful for organizing data hierarchically (e.g., by session, then by user, then by component). The key is sanitized for filesystem safety. Parameters: | Name | Type | Description | Default | | ----- | ----- | ---------------------------------------------------- | ---------- | | `key` | `str` | Subdirectory name. Special characters are sanitized. | *required* | Yields: | Type | Description | | -------------------------- | ---------------------------------- | | `AsyncIterator[DataStore]` | A new DataStore instance rooted at | | `AsyncIterator[DataStore]` | the subdirectory. | Example ```python async with store.narrow("alice") as alice_store: async with alice_store.narrow("agent") as agent_store: await agent_store.save("state", {...}) # Saves to: root_path/alice/agent/state.json ``` ### narrow_path ```python narrow_path(*keys: str) -> Path ``` Compute the path for a narrowed key hierarchy. Useful for checking paths or creating directories outside the narrow() context manager. Parameters: | Name | Type | Description | Default | | ------- | ----- | ----------------------------------------------------- | ------- | | `*keys` | `str` | Sequence of keys defining the subdirectory hierarchy. | `()` | Returns: | Type | Description | | ------ | ------------------------------- | | `Path` | Path to the narrowed directory. | ### save ```python save(key: str, data: Data) -> Future[None] ``` Save data to storage asynchronously. Queues the save operation to execute in the background, allowing the caller to continue without blocking. Parameters: | Name | Type | Description | Default | | ------ | ------ | ----------------------------------------- | ---------- | | `key` | `str` | Storage key for the data. | *required* | | `data` | `Data` | Data to save (must be JSON-serializable). | *required* | Returns: | Type | Description | | -------------- | ----------------------------------------------------------------------------------------- | | `Future[None]` | A Future that resolves when the save completes. Can be ignored for fire-and-forget saves. | ## group_genie.agent.Agent Bases: `ABC` Abstract base class for creating custom agents. Agents are the core processing units that handle delegated queries from group reasoners. They can be standalone agents or coordinator agents that orchestrate subagents in a hierarchical architecture. Implementations must handle conversation state serialization (via get_serialized and set_serialized), MCP server lifecycle management (via mcp context manager), and query processing with tool approval callbacks. State persistence is managed automatically by the framework and stored in JSON format. Persisted state is never transferred between different owners (users). Example ```python class MyAgent(Agent): def __init__(self, system_prompt: str): self._history = [] self._system_prompt = system_prompt def get_serialized(self): return {"history": self._history} def set_serialized(self, state): self._history = state["history"] @asynccontextmanager async def mcp(self): # Initialize MCP servers if needed yield self async def run(self, input: AgentInput, callback: ApprovalCallback) -> str: # Process query and return response return f"Processed: {input.query}" ``` ### get_serialized ```python get_serialized() -> Any ``` Serialize agent state for persistence. Returns conversation history and any other state needed to resume the agent after a restart. Called automatically by the framework before saving to DataStore. Returns: | Type | Description | | ----- | ----------------------------------------------------------------------------- | | `Any` | Serializable state (must be JSON-compatible). Implementation-specific format. | ### mcp ```python mcp() -> AbstractAsyncContextManager[Agent] ``` Context manager for MCP server lifecycle. Manages the lifecycle of any MCP (Model Context Protocol) servers used by this agent. Connects to the agent's MCP servers on entering the context, and disconnects on exit. Returns: | Type | Description | | ------------------------------------ | --------------------------------------- | | `AbstractAsyncContextManager[Agent]` | Async context manager that yields self. | ### run ```python run(input: AgentInput, callback: ApprovalCallback) -> str ``` Process a query and return a response. Executes the agent's core logic to process the query. Must use the provided callback for any tool calls that require approval. Agent execution blocks until all approvals are granted or denied. Parameters: | Name | Type | Description | Default | | ---------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------ | ---------- | | `input` | `AgentInput` | The query and associated data to process. | *required* | | `callback` | `ApprovalCallback` | Async callback for requesting approval of tool calls. Must be called for any tool execution that requires user approval. | *required* | Returns: | Type | Description | | ----- | --------------------------------- | | `str` | The agent's response as a string. | ### set_serialized ```python set_serialized(state: Any) ``` Restore agent state from serialized data. Reconstructs conversation history and internal state from previously serialized data. Called automatically by the framework after loading from DataStore. Parameters: | Name | Type | Description | Default | | ------- | ----- | -------------------------------------------------- | ---------- | | `state` | `Any` | Previously serialized state from get_serialized(). | *required* | ## group_genie.agent.AgentInput ```python AgentInput(query: str, attachments: list[Attachment] = list(), preferences: str | None = None) ``` Input data for agent execution. Encapsulates all information needed for an agent to process a query, including the query text, any attached files, and user-specific preferences. Attributes: | Name | Type | Description | | ------------- | ------------------ | --------------------------------------------------------------------------------------------- | | `query` | `str` | The query text for the agent to process. Should be self-contained with all necessary context. | | `attachments` | `list[Attachment]` | List of file attachments that accompany the query. | | `preferences` | \`str | None\` | Example ```python input = AgentInput( query="Analyze this report and summarize key findings", attachments=[Attachment( path="/tmp/report.pdf", name="Q3 Report", media_type="application/pdf" )], preferences="Concise responses, no emojis" ) ``` ## group_genie.agent.AgentInfo ```python AgentInfo(name: str, description: str, emoji: str | None = None, idle_timeout: float | None = None) ``` Metadata about an agent. Provides descriptive information about an agent for configuration purposes. Used by AgentFactory coordinator agents to learn about available subagents. Attributes: | Name | Type | Description | | -------------- | ------- | ---------------------------------------------------------------------------------------------------- | | `name` | `str` | Unique identifier for the agent (e.g., "search", "math", "system"). | | `description` | `str` | Description of the agent's capabilities and purpose. Used by coordinator agents to select subagents. | | `emoji` | \`str | None\` | | `idle_timeout` | \`float | None\` | Example ```python info = AgentInfo( name="search", description="Searches the web for current information", emoji="mag", idle_timeout=300.0 ) ``` ## group_genie.agent.AgentRunner ```python AgentRunner(key: str, name: str, owner: str, agent_factory: AgentFactory, data_store: DataStore | None = None, extra_tools: dict[str, AsyncTool] | None = None) ``` ### run_subagent ```python run_subagent(query: str, subagent_name: str, subagent_instance: str | None = None, attachments: list[Attachment] = []) -> str ``` Runs a subagent and returns its response. Subagents maintain state between runs. If you want to re-use a subagent instance, e.g. for a follow-up query or for an ongoing conversation with a subagent, set the `subagent_instance` to the instance id of a previously created subagent instance. Pass attachments metadata to the subagent only if you think it is required by the subagent to process the query. If you have received attachments in a query message, and already extracted the required information from them, do not pass them to the subagent. Parameters: | Name | Type | Description | Default | | ------------------- | ------------------ | ------------------------------------------------- | ----------------------------------------------------------------------------------------------------- | | `query` | `str` | The query to run the subagent with. | *required* | | `subagent_name` | `str` | The name of the subagent to run. | *required* | | `subagent_instance` | \`str | None\` | The 8-digit hex instance id of the subagent to run. If null, a new subagent instance will be created. | | `attachments` | `list[Attachment]` | The attachments metadata to pass to the subagent. | `[]` | Returns: | Type | Description | | ----- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `str` | A JSON string containing the subagent name, 8-digit hex instance id, and response, e.g. { "subagent_name": subagent name, "subagent_instance": subagent 8-digit hex instance id, "subagent_response": subagent response, } | Raises: | Type | Description | | ------------ | ------------------------------------------- | | `ValueError` | If the name of the subagent does not exist. | ## group_genie.agent.Approval ```python Approval(sender: str, tool_name: str, tool_args: tuple, tool_kwargs: dict[str, Any], ftr: Future[bool]) ``` Represents a tool call awaiting user approval. Approval objects are emitted by Execution.stream() when an agent attempts to call a tool that requires approval. Applications must approve or deny the request by calling approve() or deny(), which unblocks the agent execution. Attributes: | Name | Type | Description | | ------------- | ---------------- | -------------------------------------------------------------------------------------------- | | `sender` | `str` | Identifier of the agent or subagent requesting approval (e.g., "system", "search:a1b2c3d4"). | | `tool_name` | `str` | Name of the tool being called. | | `tool_args` | `tuple` | Positional arguments for the tool call. | | `tool_kwargs` | `dict[str, Any]` | Keyword arguments for the tool call. | | `ftr` | `Future[bool]` | Internal future for communicating the approval decision. | Example ```python async for elem in execution.stream(): match elem: case Approval() as approval: print(f"Tool call: {approval.call_repr()}") if is_safe(approval.tool_name): approval.approve() else: approval.deny() ``` ### approve ```python approve() ``` Approve the tool call and unblock agent execution. Allows the agent to proceed with the tool execution. The agent will receive the tool's result. ### approved ```python approved() -> bool ``` Wait for and return the approval decision. Blocks until approve() or deny() is called, then returns the decision. Returns: | Type | Description | | ------ | ---------------------------------- | | `bool` | True if approved, False if denied. | ### call_repr ```python call_repr() -> str ``` Get a string representation of the tool call. ### deny ```python deny() ``` Deny the tool call and unblock agent execution. Prevents the tool from executing. The agent will receive a denial message (implementation-specific behavior). ## group_genie.agent.ApprovalCallback ```python ApprovalCallback = Callable[[str, dict[str, Any]], Awaitable[bool]] ``` Callback function type for requesting approval of tool calls. When called, approval is requested and blocks until the application approves or denies the request. This callback is typically provided by ApprovalContext.approval_callback() and passed to Agent.run() to enable approval workflows. Parameters: | Name | Type | Description | Default | | ----------- | ---- | ------------------------------------ | ---------- | | `tool_name` | | Name of the tool being called. | *required* | | `tool_args` | | Keyword arguments for the tool call. | *required* | Returns: | Type | Description | | ---- | --------------------------------------------------- | | | True if the tool call is approved, False if denied. | ## group_genie.agent.ApprovalContext ```python ApprovalContext(queue: Queue[Approval], auto_approve: bool = False) ``` Context for managing the approval workflow. ApprovalContext coordinates approval requests between agents and the application. It manages a queue of Approval objects that are emitted through Execution.stream() and provides callbacks for agents to request approval. When auto_approve is enabled, all tool calls are automatically approved and Approval objects are not emitted through the stream. Attributes: | Name | Type | Description | | -------------- | ----------------- | -------------------------------------------------------------------------------------------- | | `queue` | `Queue[Approval]` | Queue for Approval objects that need user attention. | | `auto_approve` | `bool` | If True, automatically approve all tool calls without emitting Approvals. Defaults to False. | Example ```python # Auto-approve mode (used by Execution.result()) context = ApprovalContext(queue=queue, auto_approve=True) # Manual approval mode (used by Execution.stream()) context = ApprovalContext(queue=queue, auto_approve=False) ``` ### approval ```python approval(sender: str, tool_name: str, tool_args: dict[str, Any]) -> bool ``` Request approval for a tool call. If auto_approve is enabled, immediately returns True. Otherwise, creates an Approval object, adds it to the queue for the application to handle, and blocks until approve() or deny() is called. Parameters: | Name | Type | Description | Default | | ----------- | ---------------- | -------------------------------------------- | ---------- | | `sender` | `str` | Identifier of the agent requesting approval. | *required* | | `tool_name` | `str` | Name of the tool being called. | *required* | | `tool_args` | `dict[str, Any]` | Arguments for the tool call. | *required* | Returns: | Type | Description | | ------ | ---------------------------------- | | `bool` | True if approved, False if denied. | ### approval_callback ```python approval_callback(sender: str) -> ApprovalCallback ``` Create an approval callback for a specific sender. Parameters: | Name | Type | Description | Default | | -------- | ----- | -------------------------------------------- | ---------- | | `sender` | `str` | Identifier of the agent requesting approval. | *required* | Returns: | Type | Description | | ------------------ | ---------------------------------------------------- | | `ApprovalCallback` | Callback function that can be passed to Agent.run(). | ## group_genie.agent.AgentFactory ```python AgentFactory(system_agent_factory: SingleAgentFactoryFn | MultiAgentFactoryFn, system_agent_info: AgentInfo | None = None, secrets_provider: SecretsProvider | None = None) ``` Factory for creating agent instances. AgentFactory provides centralized agent creation and configuration. It supports two types of agents: 1. Standalone agents (SingleAgentFactoryFn): Simple agents that process queries independently without subagent orchestration. 1. Coordinator agents (MultiAgentFactoryFn): Complex agents that can run other agents as subagents, receiving information about available subagents and extra tools (like run_subagent). The factory automatically provides user-specific secrets to agents and maintains agent metadata for introspection. Example ```python # Standalone agent factory def create_search_agent(secrets: dict[str, str]) -> Agent: return DefaultAgent( system_prompt="You are a search specialist", model="gemini-2.5-flash", builtin_tools=[WebSearchTool()], ) # Coordinator agent factory def create_coordinator( secrets: dict[str, str], extra_tools: dict[str, AsyncTool], agent_infos: list[AgentInfo] ) -> Agent: # Has access to run_subagent tool and info about subagents return DefaultAgent( system_prompt=f"Available subagents: {agent_infos}", tools=[extra_tools["run_subagent"]], ) # Create factory factory = AgentFactory( system_agent_factory=create_coordinator, secrets_provider=my_secrets_provider, ) # Register subagents factory.add_agent_factory_fn( factory_fn=create_search_agent, info=AgentInfo(name="search", description="Web search specialist") ) ``` Initialize the agent factory. Parameters: | Name | Type | Description | Default | | ---------------------- | ---------------------- | --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `system_agent_factory` | \`SingleAgentFactoryFn | MultiAgentFactoryFn\` | Factory function for creating the main system agent. Can be either SingleAgentFactoryFn (takes only secrets) or MultiAgentFactoryFn (takes secrets, extra_tools, and agent_infos). | | `system_agent_info` | \`AgentInfo | None\` | Optional metadata for the system agent. Defaults to a basic AgentInfo with name="system" and 600s idle timeout. | | `secrets_provider` | \`SecretsProvider | None\` | Optional provider for user-specific secrets (e.g., API keys). | ### add_agent_factory_fn ```python add_agent_factory_fn(factory_fn: SingleAgentFactoryFn | MultiAgentFactoryFn, info: AgentInfo) ``` Register a new agent factory function. Adds a factory function that can create agents of a specific type. The agent can then be used as a subagent by coordinator agents. Parameters: | Name | Type | Description | Default | | ------------ | ---------------------- | ----------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- | | `factory_fn` | \`SingleAgentFactoryFn | MultiAgentFactoryFn\` | Factory function for creating the agent. Can be either SingleAgentFactoryFn or MultiAgentFactoryFn. | | `info` | `AgentInfo` | Metadata about the agent (name, description, idle timeout, etc.). | *required* | ### agent_info ```python agent_info(name: str) -> AgentInfo ``` Get metadata for a specific agent by name. Parameters: | Name | Type | Description | Default | | ------ | ----- | ------------------ | ---------- | | `name` | `str` | Name of the agent. | *required* | Returns: | Type | Description | | ----------- | ---------------------------------- | | `AgentInfo` | AgentInfo for the specified agent. | ### agent_infos ```python agent_infos(exclude: str | None = None) -> list[AgentInfo] ``` Get metadata for all registered agents. Parameters: | Name | Type | Description | Default | | --------- | ----- | ----------- | -------------------------------------------------------------------------------------------------------------------------- | | `exclude` | \`str | None\` | Optional agent name to exclude from the results (e.g., exclude the coordinator agent itself when providing subagent info). | Returns: | Type | Description | | ----------------- | -------------------------------------------------------------------- | | `list[AgentInfo]` | List of AgentInfo for all registered agents except the excluded one. | ### create_agent ```python create_agent(name: str, owner: str, extra_tools: dict[str, AsyncTool] | None = None) -> Agent ``` Create an agent by name for a specific owner. Looks up the registered factory function for the given name and creates an agent instance. Parameters: | Name | Type | Description | Default | | ------------- | ---------------------- | ----------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------- | | `name` | `str` | Name of the agent to create (must be registered via add_agent_factory_fn or be "system"). | *required* | | `owner` | `str` | User ID of the agent owner. | *required* | | `extra_tools` | \`dict[str, AsyncTool] | None\` | Optional additional tools to provide to the agent. Only used for MultiAgentFactoryFn agents. | Returns: | Type | Description | | ------- | ---------------------------------------------- | | `Agent` | A new Agent instance configured for the owner. | ### create_system_agent ```python create_system_agent(owner: str, extra_tools: dict[str, AsyncTool]) -> Agent ``` Create the main system agent for a specific owner. Parameters: | Name | Type | Description | Default | | ------------- | ---------------------- | ----------------------------------------------------------------------------------------- | ---------- | | `owner` | `str` | User ID of the agent owner. | *required* | | `extra_tools` | `dict[str, AsyncTool]` | Additional tools provided by the framework (e.g., run_subagent, get_group_chat_messages). | *required* | Returns: | Type | Description | | ------- | ---------------------------- | | `Agent` | A new system Agent instance. | ### system_agent_info ```python system_agent_info() -> AgentInfo ``` Get metadata for the system agent. Returns: | Type | Description | | ----------- | ------------------------------- | | `AgentInfo` | AgentInfo for the system agent. | ## group_genie.agent.SingleAgentFactoryFn ```python SingleAgentFactoryFn = Callable[[dict[str, str]], Agent] ``` Factory function signature for creating standalone agents. Creates agents that process queries independently without orchestrating subagents. These are "leaf" agents in an agent hierarchy. Parameters: | Name | Type | Description | Default | | --------- | ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | ---------- | | `secrets` | `dict[str, str]` | User-specific credentials (e.g., API keys) retrieved from a SecretsProvider. Common keys include "GOOGLE_API_KEY", "BRAVE_API_KEY", etc. | *required* | Returns: | Type | Description | | ---- | ----------------------------------------------------- | | | A configured Agent instance ready to process queries. | Example ```python def create_search_agent(secrets: dict[str, str]) -> Agent: model = GoogleModel( "gemini-2.5-flash", provider=GoogleProvider(api_key=secrets.get("GOOGLE_API_KEY", "")), ) return DefaultAgent( system_prompt="You are a web search specialist", model=model, builtin_tools=[WebSearchTool()], ) ``` ## group_genie.agent.MultiAgentFactoryFn ```python MultiAgentFactoryFn = Callable[[dict[str, str], dict[str, AsyncTool], list[AgentInfo]], Agent] ``` Factory function signature for creating coordinator agents. Creates agents that can orchestrate other agents as subagents. These coordinator agents receive information about available subagents and framework-provided tools like `run_subagent` to delegate work. Parameters: | Name | Type | Description | Default | | ------------- | ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------- | | `secrets` | `dict[str, str]` | User-specific credentials (e.g., API keys) retrieved from a SecretsProvider. | *required* | | `extra_tools` | `dict[str, AsyncTool]` | Framework-provided tools. Always includes run_subagent for delegating to subagents. May include get_group_chat_messages and other tools depending on the framework configuration. | *required* | | `agent_infos` | `list[AgentInfo]` | Metadata about all other registered agents (excluding the coordinator itself). Used to inform the coordinator what subagents are available. Each entry is an AgentInfo instance. | *required* | Returns: | Type | Description | | ---- | --------------------------------------------------------------- | | | A configured Agent instance capable of orchestrating subagents. | Example ```python def create_coordinator( secrets: dict[str, str], extra_tools: dict[str, AsyncTool], agent_infos: list[AgentInfo], ) -> Agent: system_prompt = f"You can delegate to: {[a.name for a in agent_infos]}" return DefaultAgent( system_prompt=system_prompt, model="gemini-2.5-flash", tools=[extra_tools["run_subagent"]], ) ``` ## group_genie.agent.Decision ```python Decision = Decision ``` ## group_genie.reasoner.GroupReasoner Bases: `ABC` Abstract base class for group reasoning logic. Group reasoners analyze incoming group chat messages and decide whether to ignore them or generate a query for downstream agents. They maintain conversation history across update messages supplied via run() calls. State persistence is managed automatically by the framework and stored in JSON format. Persisted state is never transferred between different owners (users). Example ```python class MyGroupReasoner(GroupReasoner): def __init__(self, system_prompt: str): self._history = [] self._processed = 0 self._system_prompt = system_prompt @property def processed(self) -> int: return self._processed def get_serialized(self): return {"history": self._history, "processed": self._processed} def set_serialized(self, state): self._history = state["history"] self._processed = state["processed"] async def run(self, updates: list[Message]) -> Response: # Analyze messages and decide self._processed += len(updates) return Response(decision=Decision.DELEGATE, query="...") ``` ### processed ```python processed: int ``` Number of messages processed so far by this reasoner. Used for tracking conversation history and providing context to the reasoner. ### get_serialized ```python get_serialized() -> Any ``` Serialize reasoner state for persistence. Returns conversation history and any other state needed to resume the reasoner after a restart. Called automatically by the framework before saving to DataStore. Returns: | Type | Description | | ----- | ----------------------------------------------------------------------------- | | `Any` | Serializable state (must be JSON-compatible). Implementation-specific format. | ### run ```python run(updates: list[Message]) -> Response ``` Analyze message updates and decide whether to delegate. Processes new group messages in the context of the entire conversation history and decides whether to ignore them or generate a query for agent processing. Parameters: | Name | Type | Description | Default | | --------- | --------------- | --------------------------------------------------------------------------------------------------------------- | ---------- | | `updates` | `list[Message]` | List of new messages to process. Must not be empty. Represents messages that arrived since the last run() call. | *required* | Returns: | Type | Description | | ---------- | ------------------------------------------------------------------------------------------------------------------------------- | | `Response` | Response from group-sense containing the decision (IGNORE or DELEGATE) and optional delegation parameters (query and receiver). | ### set_serialized ```python set_serialized(serialized: Any) ``` Restore reasoner state from serialized data. Reconstructs conversation history and internal state from previously serialized data. Called automatically by the framework after loading from DataStore. Parameters: | Name | Type | Description | Default | | ------------ | ----- | -------------------------------------------------- | ---------- | | `serialized` | `Any` | Previously serialized state from get_serialized(). | *required* | ## group_genie.reasoner.GroupReasonerFactory ```python GroupReasonerFactory(group_reasoner_factory_fn: GroupReasonerFactoryFn, group_reasoner_idle_timeout: float | None = None, secrets_provider: SecretsProvider | None = None) ``` Bases: `GroupReasonerFactory` Factory for creating group reasoner instances. GroupReasonerFactory creates reasoner instances customized for specific users (owners). It provides user-specific secrets and stores idle timeout configuration. Each user typically gets their own reasoner instance to maintain independent reasoning state and conversation history. Example ```python def create_reasoner(secrets: dict[str, str], owner: str) -> GroupReasoner: template = "You are assisting {owner} in a group chat..." system_prompt = template.format(owner=owner) return DefaultGroupReasoner(system_prompt=system_prompt) factory = GroupReasonerFactory( group_reasoner_factory_fn=create_reasoner, group_reasoner_idle_timeout=600, secrets_provider=my_secrets_provider, ) # Factory creates reasoner for specific user reasoner = factory.create_group_reasoner(owner="alice") ``` Initialize the group reasoner factory. Parameters: | Name | Type | Description | Default | | ----------------------------- | ------------------------ | -------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- | | `group_reasoner_factory_fn` | `GroupReasonerFactoryFn` | Factory function that creates a GroupReasoner for a specific owner. Receives secrets and owner ID. | *required* | | `group_reasoner_idle_timeout` | \`float | None\` | Optional timeout in seconds after which an idle reasoner is stopped to free resources. Defaults to 600s (10 minutes). | | `secrets_provider` | \`SecretsProvider | None\` | Optional provider for user-specific secrets (e.g., API keys). | ### create_group_reasoner ```python create_group_reasoner(owner: str, **kwargs: Any) -> GroupReasoner ``` Create a group reasoner instance for a specific owner. Retrieves secrets for the owner and creates a reasoner instance using the factory function. Parameters: | Name | Type | Description | Default | | ---------- | ----- | ------------------------------------------------------------ | ---------- | | `owner` | `str` | User ID of the reasoner owner. | *required* | | `**kwargs` | `Any` | Additional keyword arguments passed to the factory function. | `{}` | Returns: | Type | Description | | --------------- | ------------------------------------------------------ | | `GroupReasoner` | A new GroupReasoner instance configured for the owner. | ## group_genie.reasoner.GroupReasonerFactoryFn ```python GroupReasonerFactoryFn = Callable[[dict[str, str], str], GroupReasoner] ``` Factory function signature for creating group reasoners. Creates reasoner instances customized for specific users (owners). Each user typically gets their own reasoner instance to enable concurrent reasoning for different users. Parameters: | Name | Type | Description | Default | | --------- | ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | ---------- | | `secrets` | `dict[str, str]` | User-specific credentials (e.g., API keys) retrieved from a SecretsProvider. Common keys include "GOOGLE_API_KEY", "BRAVE_API_KEY", etc. | *required* | | `owner` | `str` | Username of the reasoner owner. Can be used to personalize behavior (e.g., formatting system prompts with the owner's name). | *required* | Returns: | Type | Description | | ---- | ------------------------------------------------------------ | | | A configured GroupReasoner instance for the specified owner. | Example ```python def create_reasoner(secrets: dict[str, str], owner: str) -> GroupReasoner: template = "You are assisting {owner} in a group chat..." system_prompt = template.format(owner=owner) model = GoogleModel( "gemini-2.5-flash", provider=GoogleProvider(api_key=secrets.get("GOOGLE_API_KEY", "")), ) return DefaultGroupReasoner( system_prompt=system_prompt, model=model, ) ``` ## group_genie.agent.provider.pydantic_ai.DefaultAgent ```python DefaultAgent(system_prompt: str, model: str | Model, model_settings: ModelSettings | None = None, toolsets: list[AbstractToolset] = [], tools: list[AsyncTool] = [], builtin_tools: list[AbstractBuiltinTool] = []) ``` Bases: `Stateful`, `Agent` Default `Agent` implementation using [pydantic-ai](https://ai.pydantic.dev/). DefaultAgent is a ready-to-use Agent implementation built on pydantic-ai. It supports conversation state management, tool calling with approval workflows, and MCP server lifecycle management. The agent can be configured with: - Custom system prompts - Any pydantic-ai compatible model - Toolsets (collections of tools, including MCP servers) - Individual tools (async functions) - Built-in tools (like `WebSearchTool`) For model and tool configuration details, consult the pydantic-ai documentation. Example ```python from pydantic_ai.builtin_tools import WebSearchTool from pydantic_ai.models.google import GoogleModelSettings agent = DefaultAgent( system_prompt="You are a helpful assistant", model="gemini-2.5-flash", model_settings=GoogleModelSettings( google_thinking_config={ "thinking_budget": -1, "include_thoughts": True, } ), builtin_tools=[WebSearchTool()], ) ``` Initialize a pydantic-ai based agent. Parameters: | Name | Type | Description | Default | | ---------------- | --------------------------- | ----------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------- | | `system_prompt` | `str` | System prompt that defines the agent's behavior and personality. | *required* | | `model` | \`str | Model\` | Model identifier or pydantic-ai Model instance. Can be any model supported by pydantic-ai. | | `model_settings` | \`ModelSettings | None\` | Optional model-specific settings. See pydantic-ai documentation for available settings per model provider. | | `toolsets` | `list[AbstractToolset]` | List of tool collections (including MCP servers). Use this for organized sets of related tools. | `[]` | | `tools` | `list[AsyncTool]` | List of individual async functions to make available as tools. | `[]` | | `builtin_tools` | `list[AbstractBuiltinTool]` | List of pydantic-ai built-in tools (e.g., WebSearchTool). | `[]` | ### mcp ```python mcp() ``` Manage MCP server lifecycle for this agent. Delegates MCP server management to the underlying pydantic-ai agent, which handles connection and cleanup of any MCP servers included in toolsets. Yields: | Type | Description | | ---- | -------------------- | | | This agent instance. | ### run ```python run(input: AgentInput, callback: ApprovalCallback) -> str ``` Process a query and return a response. Runs the pydantic-ai agent with the provided query, attachments, and preferences. Tool calls are intercepted and routed through the approval callback, allowing the application to approve or deny tool execution. Parameters: | Name | Type | Description | Default | | ---------- | ------------------ | ------------------------------------------------------------------------------------- | ---------- | | `input` | `AgentInput` | Query, attachments, and preferences to process. | *required* | | `callback` | `ApprovalCallback` | Approval callback for tool calls. Called for each tool execution to request approval. | *required* | Returns: | Type | Description | | ----- | --------------------------------- | | `str` | The agent's response as a string. | ## group_genie.agent.provider.pydantic_ai.DefaultGroupReasoner ```python DefaultGroupReasoner(system_prompt: str, model: str | Model | None = None, model_settings: ModelSettings | None = None) ``` Bases: `GroupReasoner` Default group reasoner implementation using [group-sense](https://gradion-ai.github.io/group-sense/). DefaultGroupReasoner wraps the group-sense library's DefaultGroupReasoner, adapting Group Genie's Message types to group-sense's message format. The reasoner analyzes group chat messages according to the system prompt's engagement criteria and decides whether to delegate queries to agents. For model and configuration details, consult the group-sense and pydantic-ai documentation. Tested with gemini-2.5-flash but compatible with any pydantic-ai supported model. Example ```python reasoner = DefaultGroupReasoner( system_prompt=''' You are monitoring a group chat for {owner}. Delegate when {owner} asks questions. Generate self-contained queries. '''.format(owner="alice"), model="gemini-2.5-flash", ) # Process messages response = await reasoner.run([ Message(content="What's the weather?", sender="alice") ]) if response.decision == Decision.DELEGATE: print(f"Query: {response.query}") ``` Initialize a group-sense based reasoner. Parameters: | Name | Type | Description | Default | | ---------------- | --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------- | | `system_prompt` | `str` | System prompt defining the engagement criteria. Should describe when to delegate messages and how to transform them into self-contained queries. | *required* | | `model` | \`str | Model | None\` | | `model_settings` | \`ModelSettings | None\` | Optional model-specific settings. See pydantic-ai documentation for available settings per model provider. | ### run ```python run(updates: list[Message]) -> Response ``` Analyze message updates and decide whether to delegate. Converts Group Genie messages to group-sense format and delegates to the underlying group-sense reasoner for processing. Parameters: | Name | Type | Description | Default | | --------- | --------------- | -------------------------------- | ---------- | | `updates` | `list[Message]` | List of new messages to analyze. | *required* | Returns: | Type | Description | | ---------- | -------------------------------------------------------------------- | | `Response` | Response from group-sense with decision and optional query/receiver. | ## group_genie.agent.provider.pydantic_ai.ToolFilter ```python ToolFilter(included: list[str] | None = None, excluded: list[str] | None = None) ``` Filter function for selectively exposing tools to agents based on whitelists and blacklists. This class is designed to be passed to pydantic-ai's `FilteredToolset` or the `filtered()` method on any toolset. It implements a callable filter that receives the run context and tool definition for each tool and returns whether the tool should be available. The filter operates as follows: - If `included` is specified, only tools in the whitelist are allowed - If `excluded` is specified, tools in the blacklist are rejected - If both are specified, a tool must be in `included` and not in `excluded` - If neither is specified, all tools are allowed Example ```python filter = ToolFilter(included=["read_file", "write_file"]) filtered_toolset = my_toolset.filtered(filter) ``` Attributes: | Name | Type | Description | | ---------- | ----------- | ----------- | | `included` | \`list[str] | None\` | | `excluded` | \`list[str] | None\` | ## group_genie.agent.provider.openai.DefaultAgent ```python DefaultAgent(system_prompt: str, model: str | Model, model_settings: ModelSettings, tools: list[Tool] = [], mcp_servers: list[Any] = [], **kwargs: Any) ``` Bases: `Agent` Default Agent implementation using the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/). DefaultAgent is a ready-to-use Agent implementation built on the OpenAI Agents SDK. It supports conversation state management, tool calling with approval workflows, and MCP server lifecycle management. The agent can be configured with: - Custom system prompts (instructions) - Any OpenAI Agents SDK compatible model - Individual tools (function tools) - MCP servers for external integrations For model and tool configuration details, consult the [OpenAI Agents SDK documentation](https://openai.github.io/openai-agents-python/). Example ```python from agents import Model, ModelSettings, function_tool @function_tool def get_weather(city: str) -> str: return f"Weather in {city}: sunny" agent = DefaultAgent( system_prompt="You are a helpful weather assistant", model="gpt-4o", model_settings=ModelSettings(temperature=0.7), tools=[get_weather], ) ``` Initialize an OpenAI Agents SDK based agent. Parameters: | Name | Type | Description | Default | | ---------------- | --------------- | ------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------- | | `system_prompt` | `str` | System prompt (instructions) that defines the agent's behavior and personality. | *required* | | `model` | \`str | Model\` | Model identifier or OpenAI Agents SDK Model instance. Can be any model supported by the OpenAI Agents SDK. | | `model_settings` | `ModelSettings` | Model-specific settings from the OpenAI Agents SDK. See the SDK documentation for available settings per model provider. | *required* | | `tools` | `list[Tool]` | List of individual tools (typically function tools created with @function_tool decorator from the OpenAI Agents SDK). | `[]` | | `mcp_servers` | `list[Any]` | List of MCP server instances from the OpenAI Agents SDK. These will be wrapped with approval interceptors. | `[]` | | `**kwargs` | `Any` | Additional arguments passed to the underlying OpenAI Agent constructor. | `{}` | ### run ```python run(input: AgentInput, callback: ApprovalCallback) -> str ``` Process a query and return a response. Runs the OpenAI Agents SDK agent with the provided query, attachments, and preferences. Tool call approvals are requested through the approval callback, allowing the application to approve or deny tool execution. Image attachments are converted to base64-encoded data URLs. User preferences are temporarily added to the conversation but removed from the persisted history after execution. Parameters: | Name | Type | Description | Default | | ---------- | ------------------ | ----------------------------------------------------------------------------------------------------------- | ---------- | | `input` | `AgentInput` | Query, attachments, and preferences to process. See AgentInput for details. | *required* | | `callback` | `ApprovalCallback` | Approval callback for tool calls. Called for each tool execution to request approval. See ApprovalCallback. | *required* | Returns: | Type | Description | | ----- | --------------------------------- | | `str` | The agent's response as a string. | Raises: | Type | Description | | ------------ | -------------------------------------------- | | `ValueError` | If an attachment has a non-image media type. | ### mcp ```python mcp() -> AsyncIterator[DefaultAgent] ``` Manage MCP server lifecycle for this agent. Connects to all configured MCP servers and wraps them with approval interceptors. Creates the underlying OpenAI Agents SDK agent instance with all tools and MCP servers. On exit, disconnects from MCP servers and cleans up the agent. Yields: | Type | Description | | ----------------------------- | -------------------- | | `AsyncIterator[DefaultAgent]` | This agent instance. |