Runtime

Runtime configuration for the widget and backend.

Agentic Runtime

The agentic runtime is the core execution loop that orchestrates user history, LLM completion calls, and concurrent on-chain execution. This guide covers the key patterns that make AOMI highly concurrent and configurable.

Overview

Concurrency Model

OS Thread per Active Conversation

Each active conversation runs on its own OS thread, enabling true parallel execution across sessions with isolated failure domains:

Benefits

AspectBenefit
IsolationSession failures don't cascade
ParallelismMultiple LLM calls execute simultaneously
ResponsivenessLong tool calls don't block other users
ScalabilityLinear scaling with CPU cores

User History

Public Key Binding

History is linked to the user's public key (wallet address), enabling cross-session context and wallet-aware conversations:

// History loads automatically based on public key
let session = session_manager
    .get_or_create_session(
        &session_id,
        backend,
        Some(public_key), // Links history to wallet
    )
    .await?;

Configurable History Loading

The HistoryBackend trait allows pluggable persistence strategies:

pub trait HistoryBackend: Send + Sync {
    /// Load history for a public key
    async fn load_history(&self, public_key: &str) -> Result<Vec<Message>>;

    /// Save messages to history
    async fn save_history(&self, public_key: &str, messages: &[Message]) -> Result<()>;

    /// Flush pending writes
    async fn flush(&self) -> Result<()>;
}

Persistent History

// PostgreSQL/SQLite persistence
let backend = PersistentHistoryBackend::new(db_pool).await?;

// History is automatically:
// 1. Loaded on session creation
// 2. Updated after each message
// 3. Flushed periodically in background

Tool Execution

Concurrent Multi-Tool Calls

When the LLM requests multiple tools, they execute in parallel:

Request Flow

// 1. LLM requests tool call
let tool_call = ToolCall {
    name: "get_account_info",
    arguments: json!({"address": "vitalik.eth"}),
};

// 2. Scheduler routes to handler
let handler = scheduler.get_handler();
handler.request(&tool_call.name, tool_call.arguments, call_id)?;

// 3. Handler executes and streams result
let stream = handler.take_last_stream();

Multi-Step Async Tools

Long-running tools stream results over time without blocking:

impl MultiStepApiTool for ForgeExecutor {
    async fn call_stream(
        &self,
        request: ExecuteParams,
        sender: Sender<Result<Value>>,
    ) -> Result<()> {
        // Stream progress updates
        sender.send(Ok(json!({"status": "compiling"}))).await?;

        let compiled = self.compile(&request).await?;
        sender.send(Ok(json!({"status": "simulating"}))).await?;

        let result = self.simulate(compiled).await?;
        sender.send(Ok(json!({"status": "complete", "result": result}))).await?;

        Ok(())
    }
}

IO Scheduler Architecture

Centralized Tool Management

The ToolScheduler acts as an IO Bus, centralizing all tool execution and routing to appropriate backends:

Global Singleton Pattern

// Get or create the global scheduler
let scheduler = ToolScheduler::get_or_init().await?;

// The scheduler is thread-safe and shared across all sessions
// Tools are registered once at startup
scheduler.register_tool(GetContractABI)?;
scheduler.register_tool(SimulateTransaction)?;

External Clients

The IO Bus manages connections to external services:

// External clients are initialized once and shared
let clients = external_clients().await;

// Access specific clients
let baml = clients.baml_client()?;
let etherscan = clients.etherscan_client()?;
let cast = clients.cast_client("ethereum")?;

IO Bus Benefits

BenefitDescription
Unified ManagementSingle source of truth for API clients
Connection PoolingShared connections reduce overhead
Rate LimitingCentralized throttling prevents abuse
Error HandlingConsistent retry and fallback logic
ObservabilityCentralized logging and metrics

Stream Processing

Tool Result Streams

Results flow through typed streams:

Polling Streams

// Poll all pending streams for next result
while let Some(completion) = handler.poll_streams_to_next_result().await {
    let ToolCompletion {
        call_id,
        tool_name,
        is_multi_step,
        result,
    } = completion;

    if is_multi_step {
        // Route to system events for UI updates
        yield ChatCommand::AsyncToolResult { call_id, tool_name, result };
    }

    // Always finalize in chat history
    finalize_tool_result(&mut history, call_id, result);
}

Session State Machine

State Transitions

Update Loop

pub async fn update_state(&mut self) {
    // 1. Process ChatCommand stream (LLM responses)
    while let Ok(msg) = self.receiver_from_llm.try_recv() {
        match msg {
            ChatCommand::StreamingText(text) => {
                self.append_streaming_text(text);
            }
            ChatCommand::ToolCall { topic, stream } => {
                self.handle_tool_call(topic, stream).await;
            }
            ChatCommand::Complete => {
                self.finalize_response();
            }
            // ...
        }
    }

    // 2. Drain system events
    let new_events = self.system_event_queue.slice_from(self.processed_idx);
    self.processed_idx += new_events.len();

    for event in new_events {
        self.handle_system_event(event).await;
    }
}

Performance Considerations

Memory Management

  • Session State: Kept minimal, messages flushed to database
  • Tool Results: Streamed, not buffered entirely
  • History: Sliding window prevents unbounded growth

Backpressure

Bounded channels prevent memory exhaustion:

// UI channel with backpressure
let (tx, rx) = mpsc::channel::<ChatCommand>(100);

// Tool result channel
let (tool_tx, tool_rx) = mpsc::channel::<Result<Value>>(16);

Graceful Degradation

// Timeouts on external calls
let result = tokio::time::timeout(
    Duration::from_secs(30),
    client.fetch_contract_abi(address),
).await??;

// Retry with exponential backoff
let result = retry_with_backoff(|| async {
    etherscan.get_source_code(address).await
}, 3).await?;

On this page