Chat

Chat module - Stateful multi-turn conversation sessions

Classes

Class Description
Chat Stateful multi-turn chat session.
Response Completed generation result.
StreamingResponse Streaming generation result that yields tokens incrementa...
AsyncChat Async stateful multi-turn chat session.
AsyncResponse Async completed generation result.
AsyncStreamingResponse Async streaming generation result that yields tokens incr...
ConversationItems Read-only view into conversation history.
Hook Base class for generation hooks.
HookManager Hook dispatcher for generation lifecycle events.
Token Single token from a streaming response.
Usage Token usage statistics.
Timings Generation timing breakdown.
FinishReason Constants for generation stop reasons.
TokenLogprob Log probability for a single token.
ResponseMetadata Generation metadata and debug information.
ResponseFormat Structured output format specification.
ToolCall Tool call requested by the model.
ToolCallFunction Function name and arguments within a tool call.
ToolResult Result of a tool execution.
ToolState Tool execution state for streaming UIs.
ToolStatus Constants for tool execution status.

class Chat

Chat(
    self,
    model: str | None = None,
    client: Client | None = None,
    config: GenerationConfig | None = None,
    system: str | None = None,
    profile: Profile | None = None,
    session_id: str | None = None,
    parent_session_id: str | None = None,
    group_id: str | None = None,
    ttl_ts: int | None = None,
    marker: str = '',
    metadata: dict | None = None,
    source_doc_id: str | None = None,
    prompt_id: str | None = None,
    chat_template: str | PromptTemplate | None = None,
    storage: Database | None = None,
    offline: bool = False,
    _defer_session_update: bool = False
)

Stateful multi-turn chat session.

Chat is the primary interface for talu. Pass a model string to get a fully autonomous chat that handles everything, or pass a client for efficient multi-user serving.

Separation of Concerns
  • Chat manages session state: conversation history, system prompt, templates
  • Client manages infrastructure: model loading, GPU layers, API keys, threading

For custom hardware or backend configuration, create a Client first:

client = Client("model", gpu_layers=20, api_key="...")
chat = client.chat(system="You are helpful.")
Concurrency

Not intended for concurrent use. Create one Chat per thread/task. Sharing across threads can interleave message history unpredictably.

Note

Creating multiple Chat instances for the same model is efficient - they share the underlying engine. Only the message history is per-Chat.

Parameters
model

Model to load (HuggingFace ID or local path). Creates a default Client. For custom configuration (GPU layers, API keys, etc.), use Client instead.

client

Existing Client to use (for multi-user serving or custom config).

config

Default GenerationConfig for this session. If provided, these settings are used for all send/stream calls unless overridden.

system

Optional system prompt. Stored as the first message with role="system" (accessible via messages[0]). This follows the HuggingFace chat template convention where system prompts are part of the messages list, not a separate template variable.

profile

Optional storage profile. When provided, chat history is persisted under ~/.talu/db/<profile>/. If session_id is not provided, a UUIDv4 session ID is generated automatically.

session_id

Optional session identifier for this conversation. Used by storage backends to group messages by session. When persisting to TaluDB (when using talu://), session_id is hashed to SESSION_HASH for efficient Jump Reads during session restoration.

parent_session_id

Optional parent session identifier for forks.

marker

Session marker for storage backends (default: "" = normal/unmarked). Values: "pinned", "archived", "deleted", or "" (normal).

metadata

Optional session metadata dict (tags, UI state, notes).

chat_template

Custom chat template to use instead of the model's default. Can be a PromptTemplate object or a template string. If None (default), uses the model's chat_template from tokenizer_config.json.

storage

Storage for messages. Defaults to Database(":memory:"). Use Database("talu://<path>") for TaluDB persistence (requires session_id). Cannot be combined with profile.

offline

If True, disallow network access when resolving model URIs.

Attributes
config

The session's GenerationConfig. This is the single source of truth for generation parameters. Can be read or replaced directly.

messages

List-like access to all messages (including system prompt). The system prompt (if set) appears at index 0 with role="system".

session_id

The session identifier for this conversation, or None.

client

The Client used for this chat (if any).

router

The Router used for generation (if any).

chat_template

The PromptTemplate used for formatting prompts.

Raises
ValidationError

If both model and client are provided.

MemoryError

If Chat creation fails (insufficient memory).

Note

Provide either `model` OR `client`, not both. If neither is provided, Chat works as a lightweight state container (for advanced use).

Configuration Precedence

When calling send/stream, parameters are resolved in this order: 1. **kwargs (e.g., temperature=0.1) - highest priority 2. config parameter (explicit GenerationConfig object) 3. self.config (session default) - lowest priority

Example - Simple chat
>>> chat = Chat("Qwen/Qwen3-0.6B", system="You are helpful.")
>>> response = chat("What is 2+2?")
>>> print(response)
4
Example - Remote backend (use Client for backend config)
>>> client = Client("gpt-4", base_url="http://localhost:8080/v1", api_key="sk-...")
>>> chat = client.chat()
>>> response = chat("Hello!")
Example - Local backend with GPU offload (use Client for hardware config)
>>> client = Client("Qwen/Qwen3-0.6B", gpu_layers=20, num_threads=4)
>>> chat = client.chat()
Example - Multi-turn conversation
>>> response = chat("What is Python?")
>>> response = response.append("What is it used for?")
>>> response = response.append("Give me an example")
Example - Streaming
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> response = chat("Tell me a story", stream=True)
>>> for token in response:
...     print(token, end="", flush=True)
Example - Multi-user serving
>>> client = Client("Qwen/Qwen3-0.6B")
>>> user1 = client.chat(system="You are helpful.")
>>> user2 = client.chat(system="You are a pirate.")
>>> response = user1("Hello!")
>>> response = user2("Ahoy!")
Example - Using GenerationConfig
>>> config = GenerationConfig(temperature=0.7, max_tokens=100)
>>> chat = Chat("model", config=config)
>>> print(chat.config.temperature)  # 0.7
>>> chat.send("Solve: 2+2")  # Uses temp=0.7 automatically
Example - Per-call overrides with kwargs (preferred)
>>> chat = Chat("model", config=GenerationConfig(temperature=0.7))
>>> chat.send("Solve math", temperature=0.1)  # Uses 0.1 for this call only
Example - Per-call overrides with config object
>>> chat.send("Complex task", config=GenerationConfig(top_k=20))
Example - Combined overrides (kwargs win)
>>> chat.send("Hello", config=GenerationConfig(temperature=0.5), temperature=0.1)
>>> # Uses temperature=0.1 (kwargs override config parameter)
Example - Message access
>>> chat = Chat(system="You are helpful.")
>>> chat.items[0]  # Access system prompt item
MessageItem(role='system', content=[...])
>>> chat.items[0].text  # Get text content
'You are helpful.'
>>> chat.clear()  # Clear conversation (keeps system prompt)
>>> chat.reset()  # Reset everything including system prompt

Quick Reference

Properties

Name Type
chat_template PromptTemplate | None
client Client | AsyncClient | None
items ConversationItems
last_response Response | StreamingResponse | AsyncStreamingResponse | None
max_context_length int | None
messages list
owns_client bool
prompt_id str | None
router Router | None
session_id str | None
source_doc_id str | None
system str | None

Methods

Method Description
__call__() Send a message and get a streaming response (ca...
append() Append a message to the conversation.
append_hidden() Append a hidden message to the conversation.
clear() Clear conversation history (keeps system prompt...
close() Close the chat and release resources immediately.
count_tokens() Count tokens in current history or a hypothetic...
fork() Fork this chat to explore alternative conversat...
from_dict() Deserialize from dictionary.
inherit_tags() Copy tags from the prompt document to this conv...
insert() Insert a message at the specified index.
pop() Remove and discard the last message.
preview_prompt() Return the exact formatted prompt that would be...
regenerate() Regenerate the last conversation turn.
remove() Remove message at the specified index.
reset() Reset everything including system prompt.
send() Send a message and get a response (synchronous).
set_item_parent() Set parent_item_id for an item by index.
set_item_validation_flags() Set structured validation flags for an item by ...
to_dict() Serialize to dictionary.
to_json() Get messages as JSON string (from Zig).

Properties

chat_template: PromptTemplate | None

Get the custom chat template, if any.

client: Client | AsyncClient | None

Get the client used by this chat.

items: ConversationItems

Read-only access to conversation as typed Items.

last_response: Response | StreamingResponse | AsyncStreamingResponse | None

Get the last response from generation.

max_context_length: int | None

Get the model's maximum context length.

messages: list

Read-only view of conversation as standard OpenAI-format messages.

owns_client: bool

True if this Chat owns its Client (standalone mode).

prompt_id: str | None

The prompt document ID for this conversation.

When set, this links the conversation to a prompt/persona document. The prompt document can provide the system prompt content and tags that can be inherited via inherit_tags().

This is stored on the Chat object and used by inherit_tags(). For persistent lineage tracking in session records, use source_doc_id.

router: Router | None

Get the router used by this chat.

session_id: str | None

The session identifier for this conversation.

source_doc_id: str | None

The source document ID for lineage tracking.

Links this conversation to the prompt/persona document that spawned it. Used for tracking which document was used to create the conversation.

system: str | None

Get the system prompt.

Methods

def __call__(
    self,
    message: str | list[dict] | MessageItem | list[MessageItem],
    config: GenerationConfig | None = None,
    stream: bool = True,
    on_token: Callable[[str], None] | None = None,
    response_format: type | dict | Grammar | None = None,
    kwargs: Any
)Response | StreamingResponse

Send a message and get a streaming response (callable syntax).

This is the primary way to chat. Call the Chat object directly with your message. By default, returns a StreamingResponse for real-time token display. Use stream=False for complete response.

For async usage, use send_async() instead.

Parameters
message
  • The user's message. Can be:
  • A string for simple text messages
  • A list of content parts for multimodal input: [{"type": "text", "text": "..."}, {"type": "image", "data": "...", "mime": "image/png"}]
config

Generation configuration override for this call only. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).

stream

If True (default), returns StreamingResponse with tokens arriving incrementally. This provides immediate feedback and matches industry standard for chat interfaces (ChatGPT, Claude, etc.).

If False, returns Response after generation completes.
``response.text`` is immediately available.

Why stream=True is default:

Streaming provides real-time feedback as tokens arrive, which:
  • Reduces perceived latency (users see progress immediately)
  • Prevents confusion about "hanging" during long generations
  • Long generations (10+ seconds) with no output appear broken
  • Matches industry standard for chat interfaces

Use stream=True for:

  • Interactive applications (CLIs, chat interfaces)
  • Long generations where users want real-time feedback
  • Applications showing progress indicators
  • Reducing perceived latency for user-facing apps

Use stream=False for:

  • Batch processing (collect all responses at once)
  • Simple scripts where you don't need incremental tokens
  • API endpoints returning JSON with full text
  • Testing/automation where latency doesn't matter
  • Cases requiring deterministic timing

Important: StreamingResponse is single-use. Once exhausted, you cannot iterate again. Access .text after iteration for the full accumulated text.

on_token

Optional callback called for each token (streaming only).

response_format

Dataclass type or JSON schema dict for structured output. When provided, the model output will be constrained to match the schema. Use response.parsed to get a hydrated dataclass instance. **kwargs: Individual parameter overrides (temperature, max_tokens, etc.) for this call only. Does NOT modify chat.config.

Returns

StreamingResponse: If stream=True (default). Single-use iterator. Cache tokens during iteration if needed later. Access .text after exhaustion for the full accumulated text. Response: If stream=False. Complete response with .text always available immediately.

Raises
StateError

If no router is available (Chat created without model/client).

ValidationError

If an unknown generation parameter is passed.

StructuredOutputError

If response_format schema setup fails.

Configuration Precedence

Per-call overrides do NOT mutate session state. Priority (high to low):

1. **kwargs (e.g., temperature=0.1) - this call only 2. config parameter - this call only 3. chat.config - session default (unchanged by per-call overrides)

To permanently change session config: chat.config = GenerationConfig(...)

Example - Streaming (default)
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> for token in chat("Tell me a joke"):
...     print(token, end="", flush=True)
Example - Non-streaming
>>> response = chat("What is 2+2?", stream=False)
>>> print(response)
4
Example - Structured output
>>> from dataclasses import dataclass
>>> @dataclass
... class Answer:
...     value: int
>>> response = chat("What is 2+2?", response_format=Answer, stream=False)
>>> response.parsed.value
4
Example - Per-call override (session unchanged)
>>> chat = Chat("model", config=GenerationConfig(temperature=0.7))
>>> response = chat("Hi", temperature=0.1)  # Uses 0.1 for this call
>>> chat.config.temperature  # Still 0.7 (unchanged)
0.7
Example - Multi-turn
>>> response = chat("What is 2+2?", stream=False)
>>> response = response.append("Why?")  # Inherits stream=False

def append(
    self,
    role_or_item: str | MessageItem,
    content: str | None = None,
    hidden: bool = False
)Self

Append a message to the conversation.

Can be called with either
  • Two arguments: append(role, content) - role string and content string
  • One argument: append(item) - a MessageItem object
Parameters
role_or_item

Either a role string ("system", "user", "assistant", "developer") or a MessageItem object.

content

Message content (required when first arg is a role string).

hidden

Hide from UI history while keeping in LLM context.

Returns

self, for chaining.

Raises
ValidationError

If role is invalid or arguments are malformed.

StateError

If append fails.

def append_hidden(
    self,
    role: str,
    content: str
)Self

Append a hidden message to the conversation.

Hidden messages are included in LLM context but omitted from UI history.

Parameters
role

Message role ("system", "user", "assistant", "developer").

content

Message text content.

Returns

Self for method chaining.

Raises
ValidationError

If role is not one of the valid roles.

StateError

If the append operation fails.

def clear(self)Self

Clear conversation history (keeps system prompt and settings).

Returns

self, for chaining.

def close(self)None

Close the chat and release resources immediately.

If this Chat created its own internal Client (via model="..."), the Client and its Engine are closed, freeing memory.

If this Chat uses a shared Client (via client=...), only the lightweight chat state is freed. The Client stays alive.

Safe to call multiple times.

Example - Explicit cleanup in loops
>>> for model in ["Qwen/0.5B", "Qwen/1.5B", "Qwen/4B"]:
...     chat = Chat(model)
...     print(chat("Hello"))
...     chat.close()  # Free memory before loading next model
Example - Context manager (preferred)
>>> with Chat("Qwen/0.5B") as chat:
...     print(chat("Hello"))
... # Memory freed automatically here

def count_tokens(self, message: str | None = None)int

Count tokens in current history or a hypothetical message.

Parameters
message

Optional message to count. If None, counts current history.

Returns

Token count.

Raises
StateError

If no model configured.

GenerationError

If token counting fails.

def fork(self)Chat

Fork this chat to explore alternative conversation paths.

Creates an independent copy of the chat with the same history, config, and client reference. Changes to the forked chat do not affect the original.

Returns

New Chat with copied state.

Raises
StateError

If message history cannot be copied.

Example
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> response = chat("I have chicken")
>>>
>>> # Fork to try different directions
>>> asian = response.chat.fork()
>>> italian = response.chat.fork()
>>>
>>> asian("Suggest an Asian recipe")
>>> italian("Suggest an Italian recipe")
>>>
>>> # Original unchanged
>>> print(len(chat.items))  # Same as before forking

def from_dict(
    cls,
    data: dict,
    model: str | None = None
)Self

Deserialize from dictionary.

Restores a Chat/AsyncChat from a dict created by to_dict(). Use this to resume conversations from a database or file.

Parameters
data

Dict from to_dict(). If items is provided, full ItemRecord data is loaded; otherwise only OpenAI-format messages are restored.

model

Model to load (HuggingFace ID or local path).

Returns

New instance with restored state.

Raises
StateError

If message loading fails.

def inherit_tags(self)None

Copy tags from the prompt document to this conversation.

Requires both prompt_id and session_id to be set, and requires TaluDB storage to be configured.

Raises
StateError

If chat is closed.

ValidationError

If prompt_id or session_id is not set.

IOError

If tag inheritance fails.

def insert(
    self,
    index: int,
    role: str,
    content: str,
    hidden: bool = False
)Self

Insert a message at the specified index.

Parameters
index

Position to insert at (0-based).

role

Message role ("system", "user", "assistant", "developer").

content

Message text content.

hidden

Hide from UI history while keeping in LLM context.

Returns

self, for chaining.

Raises
ValidationError

If role is invalid.

StateError

If index is out of bounds or insert fails.

def pop(self)Self

Remove and discard the last message.

Returns

self, for chaining.

Raises
StateError

If no messages to remove.

def preview_prompt(
    self,
    add_generation_prompt: bool = True,
    config: GenerationConfig | None = None
)str

Return the exact formatted prompt that would be sent to the model.

This is a read-only inspection tool for debugging template logic or verifying system prompts. It does NOT send anything to the engine or affect the conversation state.

Parameters
add_generation_prompt

If True (default), include the assistant turn marker at the end (e.g., "<|im_start|>assistant\n").

config

Optional GenerationConfig. If provided and contains a chat_template, that template will be used instead of the session-level or model default template.

Returns

The formatted prompt string.

Raises
StateError

If no engine is available and no custom template is set.

def regenerate(
    self,
    message: str | None = None,
    config: GenerationConfig | None = None,
    stream: bool = False,
    response_format: type | dict | Grammar | None = None,
    kwargs: Any
)Response | StreamingResponse

Regenerate the last conversation turn.

This method unwinds the conversation to the previous user message and triggers generation again. Use it to retry a response or edit the last user message.

The operation is atomic: it truncates to the point before the last user message and then sends (either the original or new text). This ensures fresh item IDs and timestamps for auditability.

Parameters
message

Optional new text for the user message. If provided: replaces the last user message with this text. If None: retries with the existing user message text.

config

Generation configuration override.

stream

If True, returns StreamingResponse.

response_format

Dataclass type or JSON schema dict for structured output.

**kwargs

Individual parameter overrides (temperature, max_tokens, etc.)

Returns

Response | StreamingResponse The new response from regeneration.

Raises
StateError

If no user message exists to regenerate from.

Example
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> chat("Tell me a joke")
>>> # Didn't like the joke? Retry:
>>> chat.regenerate()
>>> # Or edit and retry:
>>> chat.regenerate(message="Tell me a better joke")
>>> # With different parameters:
>>> chat.regenerate(temperature=1.2)

def remove(self, index: int)Self

Remove message at the specified index.

Parameters
index

Index of message to remove (0-based).

Returns

self, for chaining.

Raises
StateError

If index is out of bounds.

def reset(self)Self

Reset everything including system prompt.

Returns

self, for chaining.

def send(
    self,
    message: str | list[dict] | MessageItem | list[MessageItem],
    config: GenerationConfig | None = None,
    tools: list[Callable[..., Any]] | None = None,
    stream: bool = False,
    on_token: Callable[[str], None] | None = None,
    response_format: type | dict | Grammar | None = None,
    kwargs: Any
)Response | StreamingResponse

Send a message and get a response (synchronous).

This is the explicit sync method. For streaming default, use chat(). For async, use send_async().

Parameters
message
  • The user's message. Can be:
  • A string for simple text messages
  • A list of content parts for multimodal input: [{"type": "text", "text": "..."}, {"type": "image", "data": "...", "mime": "image/png"}]
config

Generation configuration override. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).

tools

Optional list of @tool-decorated functions to enable tool calling.

stream

If True, returns StreamingResponse. If False (default), returns Response.

on_token

Optional callback called for each token (streaming only).

response_format

Dataclass type or JSON schema dict for structured output. **kwargs: Individual parameter overrides (temperature, max_tokens, etc.).

Returns

Response: If stream=False (default). StreamingResponse: If stream=True.

Raises
StateError

If no router is available (Chat created without model/client).

ValidationError

If an unknown generation parameter is passed.

StructuredOutputError

If response_format schema setup fails.

Example
>>> response = chat.send("What is 2+2?")
>>> print(response)
4
>>> response = response.append("Why?")  # Continues with same mode

def set_item_parent(
    self,
    item_index: int,
    parent_item_id: int | None
)None

Set parent_item_id for an item by index.

Raises
StateError

If the operation fails.

def set_item_validation_flags(
    self,
    item_index: int,
    json_valid: bool,
    schema_valid: bool,
    repaired: bool = False
)None

Set structured validation flags for an item by index.

Use this after structured parsing/validation to mark JSON/schema validity.

Raises
StateError

If the operation fails.

def to_dict(self)dict

Serialize to dictionary.

def to_json(self)str

Get messages as JSON string (from Zig).

Returns

JSON string of messages array in OpenAI Completions format. This is an interchange format and does not include storage-only metadata.


class Response

Response(
    self,
    text: str = '',
    tokens: list[int] | None = None,
    finish_reason: str | None = None,
    usage: Usage | None = None,
    timings: Timings | None = None,
    model: str | None = None,
    logprobs: list[TokenLogprob] | None = None,
    tool_calls: list[ToolCall] | None = None,
    chat: Chat | None = None,
    metadata: ResponseMetadata | None = None,
    _response_format: type | dict | Grammar | None = None,
    _stream_mode: bool = False,
    _msg_index: int | None = None,
    _content: list[ContentPart] | None = None,
    _prompt: str | None = None
)

Completed generation result.

Wraps the result of a non-streaming generation. Behaves like a string for simple use but exposes rich metadata when needed.

The .text property contains the complete generated text, available immediately without iteration. Convert to string with str(response) or access directly via response.text.

Attributes
text

The generated text content (always available immediately).

tokens

List of generated token IDs.

finish_reason

Why generation stopped (eos_token, length, stop_sequence).

usage

Token usage statistics.

timings

Generation timing breakdown.

model

Model identifier that generated this response.

logprobs

Token log probabilities (if requested).

Example - Casual use
>>> response = chat("Hello!")
>>> print(response)  # Works like a string
Hi there!
>>> if "hello" in response.lower():
...     print("Greeting detected")
Example - Power user
>>> response = chat("Hello!")
>>> print(f"Used {response.usage.total_tokens} tokens")
>>> print(f"Finished due to: {response.finish_reason}")
>>> print(f"Model: {response.model}")

Quick Reference

Properties

Name Type
chat Chat | None
content list[ContentPart]
finish_reason str
logprobs list[TokenLogprob] | None
model str | None
parsed Any
prompt str | None
text str
timings Timings | None
tokens list[int]
tool_calls list[ToolCall] | None
usage Usage | None

Methods

Method Description
append() Continue the conversation with a follow-up mess...
endswith() Check if text ends with suffix.
lower() Return text in lowercase.
replace() Replace occurrences in text.
split() Split text.
startswith() Check if text starts with prefix.
strip() Return text with leading/trailing chars removed.
submit_tool_result() Submit a tool result and continue generation.
to_dict() Convert response to a JSON-serializable diction...
upper() Return text in uppercase.

Properties

chat: Chat | None

The Chat that generated this response.

content: list[ContentPart]

Structured content parts for multimodal output symmetry.

Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).

This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.

Returns

List of content parts (currently OutputText for text responses).

Example
>>> response = chat("Hello!")
>>> for part in response.content:
...     if part.type == ContentType.OUTPUT_TEXT:
...         print(part.text)
Note

Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.

finish_reason: str

Why generation stopped.

logprobs: list[TokenLogprob] | None

Token log probabilities (if requested).

model: str | None

Model identifier that generated this response.

parsed: Any

Parse and validate the response against the response_format schema.

If a response_format was provided during generation, this property parses the response text as JSON and validates/hydrates it into the specified type (dataclass or Pydantic model).

Returns

The parsed and validated response object, or None if no response_format was specified.

Raises
IncompleteJSONError

If finish_reason is "length" and JSON is malformed.

json.JSONDecodeError

If the response text is not valid JSON.

SchemaValidationError

If the parsed data doesn't match the schema.

prompt: str | None

The fully rendered prompt sent to the model (audit trail).

Contains the exact string that was fed to the model engine after all templating, system prompt injection, and formatting was applied. Useful for debugging template issues and understanding exactly what the model saw.

Returns

The rendered prompt string, or None if not available.

Example
>>> response = chat("Hello!")
>>> print(response.prompt)
<|im_start|>system
You are a helpful assistant.
<|im_end|>
<|im_start|>user
Hello!
<|im_end|>
<|im_start|>assistant
Note

Only available for responses generated through Chat. May be None for responses from remote APIs or when prompt wasn't captured.

text: str

The generated text content.

timings: Timings | None

Generation timing breakdown.

tokens: list[int]

List of generated token IDs.

tool_calls: list[ToolCall] | None

Tool calls requested by the model (if any).

usage: Usage | None

Token usage statistics.

Methods

def append(
    self,
    message: str,
    kwargs: Any
)Response | StreamingResponse

Continue the conversation with a follow-up message (sync).

This is the primary way to have multi-turn conversations. The append uses the same Chat that generated this response, maintaining context.

Auto-Fork Behavior: If the conversation has moved past this response (i.e., more messages were added after this response was generated), append() automatically forks the conversation and truncates it back to this point before sending the new message. This enables intuitive branching where you can append to any previous response without worrying about conversation state.

The append automatically inherits streaming mode from the original response.

Parameters
message

The follow-up message to send. **kwargs: Generation parameters (temperature, max_tokens, etc.)

Returns

Response if original was non-streaming, StreamingResponse if streaming.

Raises
StateError

If this response has no associated Chat.

Example - Linear conversation
>>> r1 = chat("What is 2+2?")
>>> r2 = r1.append("Why?")      # Continues normally
>>> r3 = r2.append("Thanks!")   # Continues normally
Example - Branching
>>> r1 = chat("Idea 1")
>>> r2 = r1.append("Critique it")   # chat has [Idea 1, Critique]
>>> r3 = r1.append("Expand on it")  # Auto-forks! r3.chat is new
>>> # Original chat unchanged, r3.chat has [Idea 1, Expand]

def endswith(self, suffix: str)bool

Check if text ends with suffix.

def lower(self)str

Return text in lowercase.

def replace(
    self,
    old: str,
    new: str,
    count: int = -1
)str

Replace occurrences in text.

def split(
    self,
    sep: str | None = None,
    maxsplit: int = -1
)list[str]

Split text.

def startswith(self, prefix: str)bool

Check if text starts with prefix.

def strip(self, chars: str | None = None)str

Return text with leading/trailing chars removed.

def submit_tool_result(
    self,
    tool_call_id: str,
    result: Any
)Response

Submit a tool result and continue generation.

When the model requests tool calls (via response.tool_calls), execute them and submit the results back using this method. The model will then continue generation with the tool results in context.

Parameters
tool_call_id

The ID from the tool call (tool_call.id).

result

The result to send back (will be JSON serialized if not str).

Returns

New Response from continued generation.

Raises
StateError

If no Chat session is attached.

Example
>>> response = chat("What's the weather?", tools=[get_weather])
>>> while response.tool_calls:
...     for call in response.tool_calls:
...         result = call.execute()
...         response = response.submit_tool_result(call.id, result)
>>> print(response)

def to_dict(self)dict[str, Any]

Convert response to a JSON-serializable dictionary.

This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.

Returns

Dict with text, finish_reason, model, and usage (if available).

Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
...     response = await chat(message)
...     return response.to_dict()  # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict())  # Works!
Example - Custom response structure
>>> result = {
...     "success": True,
...     "data": response.to_dict(),
... }

def upper(self)str

Return text in uppercase.


class StreamingResponse

StreamingResponse(
    self,
    stream_iterator: Iterator,
    on_token: Callable[[str], None] | None = None,
    on_complete: Callable[[str], None] | None = None,
    tokens: list[int] | None = None,
    finish_reason: str | None = None,
    usage: Usage | None = None,
    timings: Timings | None = None,
    model: str | None = None,
    logprobs: list[TokenLogprob] | None = None,
    tool_calls: list[ToolCall] | None = None,
    chat: Chat | None = None,
    metadata: ResponseMetadata | None = None,
    _response_format: type | dict | Grammar | None = None,
    _stream_mode: bool = True,
    _hooks: HookManager | None = None,
    _generation_start_time: float | None = None,
    _prompt: str | None = None
)

Streaming generation result that yields tokens incrementally.

Returned when calling chat(stream=True). Iterate over it to receive tokens in real-time. Text accumulates in .text as you iterate.

Streaming Behavior

StreamingResponse objects are single-use iterators. Once exhausted, you cannot iterate again. If you need the full text later, cache it during iteration:

>>> response = chat("Hello", stream=True)
>>> full_text = "".join(response)  # Cache during iteration
>>> print(full_text)

Calling len(response) or accessing response.text after the stream is exhausted returns the cached full text. Iterating multiple times on the same StreamingResponse will yield no tokens on subsequent iterations.

Concurrency

Single-consumer. Do not iterate from multiple threads/tasks.

Attributes
text

The accumulated text (grows during iteration, always available after).

tokens

List of generated token IDs (populated after iteration).

finish_reason

Why generation stopped (available after iteration).

usage

Token usage statistics (available after iteration).

timings

Generation timing breakdown (available after iteration).

model

Model identifier that generated this response.

Example
>>> response = chat("Tell me a joke", stream=True)
>>> for token in response:
...     print(token, end="", flush=True)
>>> print()
>>> print(f"Full text: {response.text}")
>>> print(f"Tokens used: {response.usage.total_tokens}")
Example - With callback
>>> def on_token(t): print(t, end="")
>>> response = chat("Hello", stream=True, on_token=on_token)
>>> for _ in response: pass  # Drain to trigger callbacks
Note

After iteration completes, you can access .text for the full accumulated text and .usage/.timings for metadata.

Quick Reference

Properties

Name Type
chat Chat | None
content list[ContentPart]
finish_reason str
logprobs list[TokenLogprob] | None
model str | None
prompt str | None
text str
timings Timings | None
tokens list[int]
tool_calls list[ToolCall] | None
usage Usage | None

Methods

Method Description
append() Continue the conversation with a follow-up mess...
endswith() Check if text ends with suffix.
lower() Return text in lowercase.
replace() Replace occurrences in text.
split() Split text.
startswith() Check if text starts with prefix.
strip() Return text with leading/trailing chars removed.
to_dict() Convert response to a JSON-serializable diction...
upper() Return text in uppercase.

Properties

chat: Chat | None

The Chat that generated this response.

content: list[ContentPart]

Structured content parts for multimodal output symmetry.

Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).

This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.

Returns

List of content parts (currently OutputText for text responses).

Example
>>> response = chat("Hello!")
>>> for part in response.content:
...     if part.type == ContentType.OUTPUT_TEXT:
...         print(part.text)
Note

Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.

finish_reason: str

Why generation stopped.

logprobs: list[TokenLogprob] | None

Token log probabilities (if requested).

model: str | None

Model identifier that generated this response.

prompt: str | None

The fully rendered prompt (available after iteration completes).

For streaming responses, the prompt is captured after iteration finishes since messages are added during streaming. Access this property after consuming the stream.

Returns

The rendered prompt string, or None if iteration hasn't completed or the prompt couldn't be captured.

text: str

The generated text content.

For StreamingResponse, accessing this property will auto-drain the stream if it hasn't been consumed yet. This ensures that `.text` always returns the complete generated text, regardless of whether the caller explicitly iterated over the response.

Returns

The full generated text content.

timings: Timings | None

Generation timing breakdown.

tokens: list[int]

List of generated token IDs.

tool_calls: list[ToolCall] | None

Tool calls requested by the model (if any).

usage: Usage | None

Token usage statistics.

Methods

def append(
    self,
    message: str,
    kwargs: Any
)StreamingResponse

Continue the conversation with a follow-up message.

Returns a StreamingResponse (inherits streaming mode from this response). See Response.append() for full documentation including auto-fork behavior.

Parameters
message

The follow-up message text. **kwargs: Generation overrides (temperature, max_tokens, etc.).

Raises
StateError

If this response has no associated Chat.

def endswith(self, suffix: str)bool

Check if text ends with suffix.

def lower(self)str

Return text in lowercase.

def replace(
    self,
    old: str,
    new: str,
    count: int = -1
)str

Replace occurrences in text.

def split(
    self,
    sep: str | None = None,
    maxsplit: int = -1
)list[str]

Split text.

def startswith(self, prefix: str)bool

Check if text starts with prefix.

def strip(self, chars: str | None = None)str

Return text with leading/trailing chars removed.

def to_dict(self)dict[str, Any]

Convert response to a JSON-serializable dictionary.

This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.

Returns

Dict with text, finish_reason, model, and usage (if available).

Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
...     response = await chat(message)
...     return response.to_dict()  # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict())  # Works!
Example - Custom response structure
>>> result = {
...     "success": True,
...     "data": response.to_dict(),
... }

def upper(self)str

Return text in uppercase.


class AsyncChat

AsyncChat(
    self,
    model: str | None = None,
    client: AsyncClient | None = None,
    config: GenerationConfig | None = None,
    system: str | None = None,
    session_id: str | None = None,
    parent_session_id: str | None = None,
    group_id: str | None = None,
    ttl_ts: int | None = None,
    marker: str = '',
    metadata: dict | None = None,
    source_doc_id: str | None = None,
    prompt_id: str | None = None,
    chat_template: str | PromptTemplate | None = None,
    storage: Database | None = None,
    offline: bool = False,
    _defer_session_update: bool = False
)

Async stateful multi-turn chat session.

AsyncChat is the async equivalent of Chat. Use it for building async applications (FastAPI, aiohttp, etc.) where you need non-blocking generation operations.

All generation methods (send(), __call__()) are async and must be awaited.

Separation of Concerns
  • AsyncChat manages session state: conversation history, system prompt, templates
  • AsyncClient manages infrastructure: model loading, GPU layers, API keys, threading

For custom hardware or backend configuration, create an AsyncClient first:

async with AsyncClient("model", gpu_layers=20, api_key="...") as client:
    chat = client.chat(system="You are helpful.")
Architecture

AsyncChat shares the same Zig backend as Chat. Model weights are cached globally, so creating AsyncChat for the same model as an existing Chat shares memory efficiently.

Concurrency

Safe to share across asyncio tasks. Not thread-safe across OS threads. Each task should maintain its own conversation flow to avoid interleaving.

Parameters
model

Model to load (HuggingFace ID or local path). Creates a default AsyncClient. For custom configuration (GPU layers, API keys, etc.), use AsyncClient instead.

client

Existing AsyncClient to use (for multi-user serving or custom config).

config

Default GenerationConfig for this session.

system

Optional system prompt.

session_id

Optional session identifier for this conversation.

parent_session_id

Optional parent session identifier for forks.

marker

Session marker for storage backends (default: "" = normal/unmarked). Values: "pinned", "archived", "deleted", or "" (normal).

metadata

Optional session metadata dict (tags, UI state, notes).

chat_template

Custom chat template to use.

storage

Storage for messages. Use Database("talu://<path>") for TaluDB persistence (requires session_id).

offline

If True, disallow network access when resolving model URIs.

Example - Basic async usage
>>> chat = AsyncChat("Qwen/Qwen3-0.6B", system="You are helpful.")
>>> response = await chat("What is 2+2?")
>>> print(response)
Example - Remote backend (use AsyncClient for backend config)
>>> async with AsyncClient("gpt-4", base_url="http://localhost:8080/v1", api_key="sk-...") as client:
...     chat = client.chat()
...     response = await chat("Hello!")
Example - Multi-turn async conversation
>>> response = await chat("Hello!")
>>> response = await response.append("Tell me more")
Example - Async streaming
>>> chat = AsyncChat("Qwen/Qwen3-0.6B")
    >>> response = await chat("Tell me a story", stream=True)
    >>> async for chunk in response:
    ...     print(chunk, end="", flush=True)

Raises ------ ValidationError: If both model and client are provided. MemoryError: If AsyncChat creation fails (insufficient memory).

Example - Multi-user async serving
>>> async with AsyncClient("Qwen/Qwen3-0.6B") as client:
...     user1 = client.chat(system="You are helpful.")
...     user2 = client.chat(system="You are a pirate.")
...     response = await user1("Hello!")
...     response = await user2("Ahoy!")

Quick Reference

Properties

Name Type
chat_template PromptTemplate | None
client Client | AsyncClient | None
items ConversationItems
last_response Response | StreamingResponse | AsyncStreamingResponse | None
max_context_length int | None
messages list
owns_client bool
prompt_id str | None
router Router | None
session_id str | None
source_doc_id str | None
system str | None

Methods

Method Description
__call__() Send a message and get an async streaming respo...
append() Append a message to the conversation.
append_hidden() Append a hidden message to the conversation.
clear() Clear conversation history (keeps system prompt...
close() Close the chat and release resources immediately.
count_tokens() Count tokens in current history or a hypothetic...
fork() Fork this chat to explore alternative conversat...
from_dict() Deserialize from dictionary.
inherit_tags() Copy tags from the prompt document to this conv...
insert() Insert a message at the specified index.
pop() Remove and discard the last message.
preview_prompt() Return the exact formatted prompt that would be...
regenerate() Regenerate the last conversation turn.
remove() Remove message at the specified index.
reset() Reset everything including system prompt.
send() Send a message and get a response (async, non-s...
set_item_parent() Set parent_item_id for an item by index.
set_item_validation_flags() Set structured validation flags for an item by ...
to_dict() Serialize to dictionary.
to_json() Get messages as JSON string (from Zig).

Properties

chat_template: PromptTemplate | None

Get the custom chat template, if any.

client: Client | AsyncClient | None

Get the client used by this chat.

items: ConversationItems

Read-only access to conversation as typed Items.

last_response: Response | StreamingResponse | AsyncStreamingResponse | None

Get the last response from generation.

max_context_length: int | None

Get the model's maximum context length.

messages: list

Read-only view of conversation as standard OpenAI-format messages.

owns_client: bool

True if this Chat owns its Client (standalone mode).

prompt_id: str | None

The prompt document ID for this conversation.

When set, this links the conversation to a prompt/persona document. The prompt document can provide the system prompt content and tags that can be inherited via inherit_tags().

This is stored on the Chat object and used by inherit_tags(). For persistent lineage tracking in session records, use source_doc_id.

router: Router | None

Get the router used by this chat.

session_id: str | None

The session identifier for this conversation.

source_doc_id: str | None

The source document ID for lineage tracking.

Links this conversation to the prompt/persona document that spawned it. Used for tracking which document was used to create the conversation.

system: str | None

Get the system prompt.

Methods

def __call__(
    self,
    message: str | list[dict] | MessageItem | list[MessageItem],
    config: GenerationConfig | None = None,
    stream: bool = True,
    on_token: Callable[[str], None] | None = None,
    response_format: type | dict | Grammar | None = None,
    kwargs: Any
)AsyncResponse | AsyncStreamingResponse

Send a message and get an async streaming response (callable syntax).

This is the primary async way to chat. Call the AsyncChat object directly with your message. By default, returns an AsyncStreamingResponse.

Parameters
message

The user's message.

config

Generation configuration override. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).

stream

If True (default), returns AsyncStreamingResponse.

on_token

Optional callback called for each token.

response_format

Dataclass type or JSON schema dict for structured output. **kwargs: Individual parameter overrides.

Returns

AsyncStreamingResponse: If stream=True (default). AsyncResponse: If stream=False.

Example
>>> response = await chat("Tell me a joke")
>>> async for token in response:
...     print(token, end="", flush=True)

def append(
    self,
    role_or_item: str | MessageItem,
    content: str | None = None,
    hidden: bool = False
)Self

Append a message to the conversation.

Can be called with either
  • Two arguments: append(role, content) - role string and content string
  • One argument: append(item) - a MessageItem object
Parameters
role_or_item

Either a role string ("system", "user", "assistant", "developer") or a MessageItem object.

content

Message content (required when first arg is a role string).

hidden

Hide from UI history while keeping in LLM context.

Returns

self, for chaining.

Raises
ValidationError

If role is invalid or arguments are malformed.

StateError

If append fails.

def append_hidden(
    self,
    role: str,
    content: str
)Self

Append a hidden message to the conversation.

Hidden messages are included in LLM context but omitted from UI history.

Parameters
role

Message role ("system", "user", "assistant", "developer").

content

Message text content.

Returns

Self for method chaining.

Raises
ValidationError

If role is not one of the valid roles.

StateError

If the append operation fails.

def clear(self)Self

Clear conversation history (keeps system prompt and settings).

Returns

self, for chaining.

def close(self)None

Close the chat and release resources immediately.

If this AsyncChat created its own internal AsyncClient (via model="..."), the Client and its Engine are closed, freeing memory.

If this AsyncChat uses a shared AsyncClient (via client=...), only the lightweight chat state is freed. The Client stays alive.

Safe to call multiple times.

Example - Explicit cleanup in loops
>>> for model in ["Qwen/0.5B", "Qwen/1.5B", "Qwen/4B"]:
...     chat = AsyncChat(model)
...     print(await chat("Hello"))
...     await chat.close()  # Free memory before loading next model
Example - Context manager (preferred)
>>> async with AsyncChat("Qwen/0.5B") as chat:
...     print(await chat("Hello"))
... # Memory freed automatically here

def count_tokens(self, message: str | None = None)int

Count tokens in current history or a hypothetical message.

Parameters
message

Optional message to count. If None, counts current history.

Returns

Token count.

Raises
StateError

If no model configured.

GenerationError

If token counting fails.

def fork(self)AsyncChat

Fork this chat to explore alternative conversation paths.

Returns

New AsyncChat with copied state.

Raises
StateError

If message copying fails.

def from_dict(
    cls,
    data: dict,
    model: str | None = None
)Self

Deserialize from dictionary.

Restores a Chat/AsyncChat from a dict created by to_dict(). Use this to resume conversations from a database or file.

Parameters
data

Dict from to_dict(). If items is provided, full ItemRecord data is loaded; otherwise only OpenAI-format messages are restored.

model

Model to load (HuggingFace ID or local path).

Returns

New instance with restored state.

Raises
StateError

If message loading fails.

def inherit_tags(self)None

Copy tags from the prompt document to this conversation.

Requires both prompt_id and session_id to be set, and requires TaluDB storage to be configured.

Raises
StateError

If chat is closed.

ValidationError

If prompt_id or session_id is not set.

IOError

If tag inheritance fails.

def insert(
    self,
    index: int,
    role: str,
    content: str,
    hidden: bool = False
)Self

Insert a message at the specified index.

Parameters
index

Position to insert at (0-based).

role

Message role ("system", "user", "assistant", "developer").

content

Message text content.

hidden

Hide from UI history while keeping in LLM context.

Returns

self, for chaining.

Raises
ValidationError

If role is invalid.

StateError

If index is out of bounds or insert fails.

def pop(self)Self

Remove and discard the last message.

Returns

self, for chaining.

Raises
StateError

If no messages to remove.

def preview_prompt(
    self,
    add_generation_prompt: bool = True,
    config: GenerationConfig | None = None
)str

Return the exact formatted prompt that would be sent to the model.

This is a read-only inspection tool for debugging template logic or verifying system prompts. It does NOT send anything to the engine or affect the conversation state.

Parameters
add_generation_prompt

If True (default), include the assistant turn marker at the end (e.g., "<|im_start|>assistant\n").

config

Optional GenerationConfig. If provided and contains a chat_template, that template will be used instead of the session-level or model default template.

Returns

The formatted prompt string.

Raises
StateError

If no engine is available and no custom template is set.

def regenerate(
    self,
    message: str | None = None,
    config: GenerationConfig | None = None,
    stream: bool = False,
    response_format: type | dict | Grammar | None = None,
    kwargs: Any
)AsyncResponse | AsyncStreamingResponse

Regenerate the last conversation turn.

This method unwinds the conversation to the previous user message and triggers generation again. Use it to retry a response or edit the last user message.

The operation is atomic: it truncates to the point before the last user message and then sends (either the original or new text). This ensures fresh item IDs and timestamps for auditability.

Parameters
message

Optional new text for the user message. If provided: replaces the last user message with this text. If None: retries with the existing user message text.

config

Generation configuration override.

stream

If True, returns AsyncStreamingResponse.

response_format

Dataclass type or JSON schema dict for structured output.

**kwargs

Individual parameter overrides (temperature, max_tokens, etc.)

Returns

AsyncResponse | AsyncStreamingResponse The new response from regeneration.

Raises
StateError

If no user message exists to regenerate from.

Example
>>> chat = AsyncChat("Qwen/Qwen3-0.6B")
>>> await chat("Tell me a joke")
>>> # Didn't like the joke? Retry:
>>> await chat.regenerate()
>>> # Or edit and retry:
>>> await chat.regenerate(message="Tell me a better joke")

def remove(self, index: int)Self

Remove message at the specified index.

Parameters
index

Index of message to remove (0-based).

Returns

self, for chaining.

Raises
StateError

If index is out of bounds.

def reset(self)Self

Reset everything including system prompt.

Returns

self, for chaining.

def send(
    self,
    message: str | list[dict] | MessageItem | list[MessageItem],
    config: GenerationConfig | None = None,
    tools: list[Callable[..., Any]] | None = None,
    stream: bool = False,
    on_token: Callable[[str], None] | None = None,
    response_format: type | dict | Grammar | None = None,
    kwargs: Any
)AsyncResponse | AsyncStreamingResponse

Send a message and get a response (async, non-streaming by default).

Parameters
message

The user's message.

config

Generation configuration override. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).

tools

Optional list of @tool-decorated functions to enable tool calling.

stream

If True, returns AsyncStreamingResponse. If False (default), AsyncResponse.

on_token

Optional callback called for each token.

response_format

Dataclass type or JSON schema dict for structured output. **kwargs: Individual parameter overrides.

Returns

AsyncResponse: If stream=False (default). AsyncStreamingResponse: If stream=True.

Raises
StateError

If no router is available (AsyncChat created without model/client).

ValidationError

If an unknown generation parameter is passed.

StructuredOutputError

If response_format schema setup fails.

Example
>>> response = await chat.send("What is 2+2?")
>>> print(response)
4
>>> response = await response.append("Why?")

def set_item_parent(
    self,
    item_index: int,
    parent_item_id: int | None
)None

Set parent_item_id for an item by index.

Raises
StateError

If the operation fails.

def set_item_validation_flags(
    self,
    item_index: int,
    json_valid: bool,
    schema_valid: bool,
    repaired: bool = False
)None

Set structured validation flags for an item by index.

Use this after structured parsing/validation to mark JSON/schema validity.

Raises
StateError

If the operation fails.

def to_dict(self)dict

Serialize to dictionary.

def to_json(self)str

Get messages as JSON string (from Zig).

Returns

JSON string of messages array in OpenAI Completions format. This is an interchange format and does not include storage-only metadata.


class AsyncResponse

AsyncResponse(
    self,
    text: str = '',
    tokens: list[int] | None = None,
    finish_reason: str | None = None,
    usage: Usage | None = None,
    timings: Timings | None = None,
    model: str | None = None,
    logprobs: list[TokenLogprob] | None = None,
    tool_calls: list[ToolCall] | None = None,
    chat: AsyncChat | None = None,
    metadata: ResponseMetadata | None = None,
    _response_format: type | dict | Grammar | None = None,
    _stream_mode: bool = False,
    _content: list[ContentPart] | None = None,
    _prompt: str | None = None
)

Async completed generation result.

Returned by AsyncChat for non-streaming generation. Contains the complete generated text and metadata. Behaves like a string for simple use but exposes rich metadata when needed.

The append() method is async and must be awaited.

Attributes
text

The generated text content.

tokens

List of generated token IDs.

finish_reason

Why generation stopped (eos_token, length, stop_sequence).

usage

Token usage statistics.

timings

Generation timing breakdown.

model

Model identifier that generated this response.

logprobs

Token log probabilities (if requested).

Example
>>> response = await chat.send("Hello!")
>>> print(response)  # Works like a string
>>> print(f"Used {response.usage.total_tokens} tokens")
Example - Multi-turn
>>> response = await chat.send("What is 2+2?")
>>> response = await response.append("Why?")
>>> response = await response.append("Are you sure?")

Quick Reference

Properties

Name Type
chat Chat | None
content list[ContentPart]
finish_reason str
logprobs list[TokenLogprob] | None
model str | None
parsed Any
prompt str | None
text str
timings Timings | None
tokens list[int]
tool_calls list[ToolCall] | None
usage Usage | None

Methods

Method Description
append() Continue the conversation with a follow-up mess...
endswith() Check if text ends with suffix.
lower() Return text in lowercase.
replace() Replace occurrences in text.
split() Split text.
startswith() Check if text starts with prefix.
strip() Return text with leading/trailing chars removed.
submit_tool_result() Submit a tool result and continue generation (a...
to_dict() Convert response to a JSON-serializable diction...
upper() Return text in uppercase.

Properties

chat: Chat | None

The Chat that generated this response.

content: list[ContentPart]

Structured content parts for multimodal output symmetry.

Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).

This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.

Returns

List of content parts (currently OutputText for text responses).

Example
>>> response = chat("Hello!")
>>> for part in response.content:
...     if part.type == ContentType.OUTPUT_TEXT:
...         print(part.text)
Note

Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.

finish_reason: str

Why generation stopped.

logprobs: list[TokenLogprob] | None

Token log probabilities (if requested).

model: str | None

Model identifier that generated this response.

parsed: Any

Parse and validate the response against the response_format schema.

If a response_format was provided during generation, this property parses the response text as JSON and validates/hydrates it into the specified type (dataclass or Pydantic model).

Returns

The parsed and validated response object, or None if no response_format was specified.

Raises
IncompleteJSONError

If finish_reason is "length" and JSON is malformed.

json.JSONDecodeError

If the response text is not valid JSON.

SchemaValidationError

If the parsed data doesn't match the schema.

prompt: str | None

The fully rendered prompt sent to the model (audit trail).

Contains the exact string that was fed to the model engine after all templating, system prompt injection, and formatting was applied. Useful for debugging template issues and understanding exactly what the model saw.

Returns

The rendered prompt string, or None if not available.

Example
>>> response = chat("Hello!")
>>> print(response.prompt)
<|im_start|>system
You are a helpful assistant.
<|im_end|>
<|im_start|>user
Hello!
<|im_end|>
<|im_start|>assistant
Note

Only available for responses generated through Chat. May be None for responses from remote APIs or when prompt wasn't captured.

text: str

The generated text content.

timings: Timings | None

Generation timing breakdown.

tokens: list[int]

List of generated token IDs.

tool_calls: list[ToolCall] | None

Tool calls requested by the model (if any).

usage: Usage | None

Token usage statistics.

Methods

def append(
    self,
    message: str,
    kwargs: Any
)AsyncResponse | AsyncStreamingResponse

Continue the conversation with a follow-up message (async).

This is the async way to have multi-turn conversations. The append uses the same AsyncChat that generated this response. Must be awaited.

Auto-Fork Behavior: If the conversation has moved past this response, append() automatically forks the conversation and truncates it back to this point before sending the new message. See Response.append() for details.

The append automatically inherits streaming mode from the original response.

Parameters
message

The follow-up message to send. **kwargs: Generation parameters (temperature, max_tokens, etc.)

Returns

AsyncResponse if original was non-streaming, AsyncStreamingResponse if streaming.

Raises
StateError

If this response has no associated AsyncChat.

Example
>>> response = await chat.send("What is 2+2?")
>>> response = await response.append("Why?")
>>> response = await response.append("Are you sure?")

def endswith(self, suffix: str)bool

Check if text ends with suffix.

def lower(self)str

Return text in lowercase.

def replace(
    self,
    old: str,
    new: str,
    count: int = -1
)str

Replace occurrences in text.

def split(
    self,
    sep: str | None = None,
    maxsplit: int = -1
)list[str]

Split text.

def startswith(self, prefix: str)bool

Check if text starts with prefix.

def strip(self, chars: str | None = None)str

Return text with leading/trailing chars removed.

def submit_tool_result(
    self,
    tool_call_id: str,
    result: Any
)AsyncResponse

Submit a tool result and continue generation (async).

Parameters
tool_call_id

The ID from the tool call (tool_call.id).

result

The result to send back (will be JSON serialized if not str).

Returns

New AsyncResponse from continued generation.

Raises
StateError

If no AsyncChat session is attached.

def to_dict(self)dict[str, Any]

Convert response to a JSON-serializable dictionary.

This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.

Returns

Dict with text, finish_reason, model, and usage (if available).

Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
...     response = await chat(message)
...     return response.to_dict()  # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict())  # Works!
Example - Custom response structure
>>> result = {
...     "success": True,
...     "data": response.to_dict(),
... }

def upper(self)str

Return text in uppercase.


class AsyncStreamingResponse

AsyncStreamingResponse(
    self,
    async_stream_iterator: AsyncIterator,
    on_token: Callable[[str], None] | None = None,
    on_complete: Callable[[str], None] | None = None,
    tokens: list[int] | None = None,
    finish_reason: str | None = None,
    usage: Usage | None = None,
    timings: Timings | None = None,
    model: str | None = None,
    logprobs: list[TokenLogprob] | None = None,
    tool_calls: list[ToolCall] | None = None,
    chat: AsyncChat | None = None,
    metadata: ResponseMetadata | None = None,
    _response_format: type | dict | Grammar | None = None,
    _stream_mode: bool = True,
    _hooks: HookManager | None = None,
    _generation_start_time: float | None = None,
    _prompt: str | None = None
)

Async streaming generation result that yields tokens incrementally.

Returned when calling chat.send(stream=True) on AsyncChat. Use async for to receive tokens in real-time. Text accumulates in .text as you iterate.

Concurrency

Single-consumer. Do not iterate from multiple tasks.

Attributes
text

The accumulated text (grows during iteration).

tokens

List of generated token IDs (populated after iteration).

finish_reason

Why generation stopped (available after iteration).

usage

Token usage statistics (available after iteration).

timings

Generation timing breakdown (available after iteration).

model

Model identifier that generated this response.

Example
>>> response = await chat.send("Tell me a joke", stream=True)
>>> async for token in response:
...     print(token, end="", flush=True)
>>> print()
>>> print(f"Full text: {response.text}")

Quick Reference

Properties

Name Type
chat Chat | None
content list[ContentPart]
finish_reason str
logprobs list[TokenLogprob] | None
model str | None
prompt str | None
text str
timings Timings | None
tokens list[int]
tool_calls list[ToolCall] | None
usage Usage | None

Methods

Method Description
append() Continue the conversation with a follow-up mess...
endswith() Check if text ends with suffix.
lower() Return text in lowercase.
replace() Replace occurrences in text.
split() Split text.
startswith() Check if text starts with prefix.
strip() Return text with leading/trailing chars removed.
to_dict() Convert response to a JSON-serializable diction...
upper() Return text in uppercase.

Properties

chat: Chat | None

The Chat that generated this response.

content: list[ContentPart]

Structured content parts for multimodal output symmetry.

Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).

This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.

Returns

List of content parts (currently OutputText for text responses).

Example
>>> response = chat("Hello!")
>>> for part in response.content:
...     if part.type == ContentType.OUTPUT_TEXT:
...         print(part.text)
Note

Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.

finish_reason: str

Why generation stopped.

logprobs: list[TokenLogprob] | None

Token log probabilities (if requested).

model: str | None

Model identifier that generated this response.

prompt: str | None

The fully rendered prompt (available after iteration completes).

For async streaming responses, the prompt is captured after iteration finishes since messages are added during streaming. Access this property after consuming the stream.

Returns

The rendered prompt string, or None if iteration hasn't completed or the prompt couldn't be captured.

text: str

The generated text content.

For AsyncStreamingResponse, this returns the text accumulated so far. To get the complete text, ensure you have consumed the stream first by iterating with async for token in response.

Note

Unlike sync StreamingResponse, AsyncStreamingResponse cannot auto-drain because it would require an async context. If you need the full text, iterate over the response first:

async for _ in response:
    pass
full_text = response.text
Returns

The accumulated text content (partial if stream not exhausted).

timings: Timings | None

Generation timing breakdown.

tokens: list[int]

List of generated token IDs.

tool_calls: list[ToolCall] | None

Tool calls requested by the model (if any).

usage: Usage | None

Token usage statistics.

Methods

def append(
    self,
    message: str,
    kwargs: Any
)AsyncStreamingResponse

Continue the conversation with a follow-up message (async streaming).

Returns AsyncStreamingResponse (inherits streaming mode). Must be awaited.

See Response.append() for full documentation including auto-fork behavior.

Parameters
message

The follow-up message text. **kwargs: Generation overrides (temperature, max_tokens, etc.).

Raises
StateError

If this response has no associated AsyncChat.

Example
>>> response = await chat("Hello")  # stream=True by default
>>> async for token in response:
...     print(token, end="")
>>> response2 = await response.append("Continue")
>>> async for token in response2:
...     print(token, end="")

def endswith(self, suffix: str)bool

Check if text ends with suffix.

def lower(self)str

Return text in lowercase.

def replace(
    self,
    old: str,
    new: str,
    count: int = -1
)str

Replace occurrences in text.

def split(
    self,
    sep: str | None = None,
    maxsplit: int = -1
)list[str]

Split text.

def startswith(self, prefix: str)bool

Check if text starts with prefix.

def strip(self, chars: str | None = None)str

Return text with leading/trailing chars removed.

def to_dict(self)dict[str, Any]

Convert response to a JSON-serializable dictionary.

This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.

Returns

Dict with text, finish_reason, model, and usage (if available).

Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
...     response = await chat(message)
...     return response.to_dict()  # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict())  # Works!
Example - Custom response structure
>>> result = {
...     "success": True,
...     "data": response.to_dict(),
... }

def upper(self)str

Return text in uppercase.


class ConversationItems

ConversationItems(
    self,
    lib: Any,
    conversation_ptr: int
)

Read-only view into conversation history.

Reads items directly from the Conversation C API, providing typed access without the Messages adapter layer. Items are read on-demand using zero-copy access to the underlying storage.

Example
>>> # Access items via chat.items
>>> for item in chat.items:
...     if isinstance(item, MessageItem):
...         print(f"{item.role.name}: {item.text}")
...     elif isinstance(item, FunctionCallItem):
...         print(f"Tool call: {item.name}({item.arguments})")

Quick Reference

Properties

Name Type
first ConversationItem | None
last ConversationItem | None
system str | None

Methods

Method Description
count() S.count(value) -> integer -- return number of o...
filter_by_role() Get all message items with a specific role.
filter_by_type() Get all items of a specific type.
index() S.index(value, [start, [stop]]) -> integer -- r...

Properties

first: ConversationItem | None

Get the first item, or None if empty.

last: ConversationItem | None

Get the last item, or None if empty.

system: str | None

Get the system message content, or None if no system message.

Methods

def count(self, value)

S.count(value) -> integer -- return number of occurrences of value

def filter_by_role(self, role: MessageRole)list[MessageItem]

Get all message items with a specific role.

Parameters
role

The message role to filter by.

def filter_by_type(self, item_type: type[ConversationItem])list[ConversationItem]

Get all items of a specific type.

Parameters
item_type

The ConversationItem subclass to filter by.

def index(
    self,
    value,
    start = 0,
    stop = None
)

S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.

Supporting start and stop arguments is optional, but recommended.


class Hook

Base class for generation hooks.

Implement any subset of these methods to receive callbacks during generation. All methods have default no-op implementations, so you only need to override the ones you care about.

Methods are called in this order

1. on_generation_start - Before Zig generation begins 2. on_first_token - When first token arrives (streaming) or N/A (non-streaming) 3. on_generation_end - After generation completes (success or error)

Thread Safety

Hook methods may be called from different threads for concurrent generations. If your hook maintains state, ensure it's thread-safe.

Quick Reference

Methods

Method Description
on_first_token() Handle first token event (streaming only).
on_generation_end() Handle generation end event (success or error).
on_generation_start() Handle generation start event.

Methods

def on_first_token(
    self,
    chat: Chat | AsyncChat,
    time_ms: float
)None

Handle first token event (streaming only).

This is the Time-To-First-Token (TTFT) measurement point, critical for perceived latency in interactive applications.

Parameters
chat

The Chat instance.

time_ms

Milliseconds since generation_start.

Note

Only called for streaming responses. For non-streaming, TTFT is effectively the same as total latency.

Example
>>> def on_first_token(self, chat, time_ms):
...     metrics.histogram("llm.ttft", time_ms)

def on_generation_end(
    self,
    chat: Chat | AsyncChat,
    response: Response | None,
    error: Exception | None = None
)None

Handle generation end event (success or error).

Parameters
chat

The Chat instance.

response

The Response object (if successful), or None if error.

error

The exception (if generation failed), or None if successful.

Example
>>> def on_generation_end(self, chat, response, error=None):
...     if error:
...         metrics.counter("llm.errors", 1)
...     else:
...         metrics.counter("llm.tokens", response.usage.total_tokens)

def on_generation_start(
    self,
    chat: Chat | AsyncChat,
    input_text: str,
    config: Any = None
)None

Handle generation start event.

Parameters
chat

The Chat instance initiating generation.

input_text

The user's input message.

config

The GenerationConfig for this request (if any).

Example
>>> def on_generation_start(self, chat, input_text, config=None):
...     self.start_time = time.perf_counter()
...     self.input_tokens = len(input_text.split())  # Rough estimate

class HookManager

HookManager(self, hooks: list[Hook] | None = None)

Hook dispatcher for generation lifecycle events.

Used internally by Client to dispatch hook calls. Users register hooks on the Client rather than interacting with this class directly.

Quick Reference

Properties

Name Type
hooks list[Hook]

Methods

Method Description
add() Add a hook to the manager.
dispatch_end() Dispatch on_generation_end to all hooks.
dispatch_first_token() Dispatch on_first_token to all hooks.
dispatch_start() Dispatch on_generation_start to all hooks.
remove() Remove a hook from the manager.

Properties

hooks: list[Hook]

Return the list of registered hooks.

Methods

def add(self, hook: Hook)None

Add a hook to the manager.

Parameters
hook

Hook instance to register.

def dispatch_end(
    self,
    chat: Chat | AsyncChat,
    response: Response | None,
    error: Exception | None = None
)None

Dispatch on_generation_end to all hooks.

Parameters
chat

The Chat or AsyncChat instance.

response

The completed response, or None on error.

error

The exception that occurred, if any.

def dispatch_first_token(
    self,
    chat: Chat | AsyncChat,
    time_ms: float
)None

Dispatch on_first_token to all hooks.

Parameters
chat

The Chat or AsyncChat instance.

time_ms

Time to first token in milliseconds.

def dispatch_start(
    self,
    chat: Chat | AsyncChat,
    input_text: str,
    config: Any = None
)None

Dispatch on_generation_start to all hooks.

Parameters
chat

The Chat or AsyncChat instance.

input_text

The user's input text.

config

Generation configuration, if any.

def remove(self, hook: Hook)None

Remove a hook from the manager.

Parameters
hook

Hook instance to unregister.


class Token

Single token from a streaming response.

Token is returned during streaming iteration. It behaves exactly like a string for casual use (print, concatenation, etc.) but also carries per-token metadata when logprobs, token IDs, or stop reason detection are needed.

Attributes
id

The token ID from the tokenizer vocabulary.

logprob

Log probability of this token (if logprobs were requested), or None.

is_special

True if this is a special token (EOS, BOS, etc.).

finish_reason

If this is the last token, why generation stopped. Otherwise None. Possible values: "eos_token", "length", "stop_sequence", "tool_calls".

Example
>>> for token in chat("Hello", stream=True):
...     print(token, end="", flush=True)
Example (with metadata)
>>> for token in chat("Hello", stream=True):
...     if token.logprob is not None and token.logprob < -5.0:
...         ui.highlight_uncertain(token)
...     print(token, end="")
Note

Token instances are immutable (like str). Metadata is set at construction and cannot be modified afterward.


class Usage

Usage(
    self,
    prompt_tokens: int,
    completion_tokens: int,
    total_tokens: int
)

Token usage statistics.

Attributes
prompt_tokens

Tokens in the input prompt.

completion_tokens

Tokens in the generated response.

total_tokens

Total tokens (prompt + completion).


class Timings

Timings(
    self,
    prefill_ms: float,
    generation_ms: float,
    tokens_per_second: float
)

Generation timing breakdown.

Provides detailed performance metrics for generation, useful for profiling, optimization, and monitoring latency in production.

Attributes
prefill_ms

Time to process the prompt (milliseconds). This is the "time to first token" - how long before generation starts.

generation_ms

Time to generate all tokens (milliseconds). This is the decode phase - actual token generation time.

tokens_per_second

Generation throughput (tokens/sec). Calculated as completion_tokens / (generation_ms / 1000).

Example
>>> response = chat("Tell me a story")
>>> if response.timings:
...     print(f"Prefill: {response.timings.prefill_ms:.1f}ms")
...     print(f"Generation: {response.timings.generation_ms:.1f}ms")
...     print(f"Speed: {response.timings.tokens_per_second:.1f} tok/s")

Quick Reference

Methods

Method Description
from_ns() Create Timings from nanosecond values.

Methods

def from_ns(
    cls,
    prefill_ns: int,
    generation_ns: int,
    token_count: int
)Timings

Create Timings from nanosecond values.

Parameters
prefill_ns

Prefill time in nanoseconds.

generation_ns

Generation time in nanoseconds.

token_count

Number of tokens generated.

Returns

Timings instance with millisecond values and throughput.


class FinishReason

Constants for generation stop reasons.

Attributes
EOS_TOKEN

End-of-sequence token generated.

LENGTH

Maximum token limit reached.

STOP_SEQUENCE

User-defined stop sequence matched.

TOOL_CALLS

Model requested tool execution.

CANCELLED

Request was cancelled (client disconnect, stop flag set).


class TokenLogprob

TokenLogprob(
    self,
    token: int,
    token_str: str,
    logprob: float,
    top_logprobs: list[tuple[int, str, float]] | None = None
)

Log probability for a single token.

Attributes
token

Token ID.

token_str

Token as string.

logprob

Log probability.

top_logprobs

Alternative tokens at this position.


class ResponseMetadata

ResponseMetadata(
    self,
    finish_reason: str,
    schema_tokens: int = 0,
    schema_injection: str | None = None,
    grammar_gbnf: str | None = None,
    grammar_trace: list[str] | None = None,
    prefill_success: bool | None = None
)

Generation metadata and debug information.


class ResponseFormat

ResponseFormat(
    self,
    type: str = 'text',
    json_schema: dict | None = None
)

Structured output format specification.

Used to constrain generation to produce valid JSON matching a schema.

Attributes
type

The format type ("text" or "json_object").

json_schema

JSON Schema dict for structured output (when type="json_object").

Example
>>> config = GenerationConfig(
...     response_format=ResponseFormat(
...         type="json_object",
...         json_schema={"type": "object", "properties": {"name": {"type": "string"}}}
...     )
... )

class ToolCall

ToolCall(
    self,
    id: str,
    type: str,
    function: ToolCallFunction,
    _func: Callable[..., Any] | None = None
)

Tool call requested by the model.

Follows the OpenAI tool call format for compatibility with agent frameworks and tool-calling workflows.

Attributes
id

Unique identifier for this tool call.

type

Always "function" for function calls.

function

The function details (name and arguments).

Example
>>> if response.tool_calls:
...     for tool in response.tool_calls:
...         print(f"Call: {tool.function.name}")
...         args = tool.function.arguments_parsed()
...         result = execute_tool(tool.function.name, args)

Quick Reference

Properties

Name Type
arguments str
name str

Methods

Method Description
create() Create a ToolCall with the given parameters.
execute() Execute the tool call by invoking the mapped Py...
execute_async() Execute the tool call asynchronously.

Properties

arguments: str

Convenience access to function arguments.

name: str

Convenience access to function name.

Methods

def create(
    cls,
    id: str,
    name: str,
    arguments: str
)ToolCall

Create a ToolCall with the given parameters.

def execute(self)Any

Execute the tool call by invoking the mapped Python function.

Returns

The return value of the tool function.

Raises
ToolExecutionError

If no function is mapped to this tool call.

def execute_async(self)Any

Execute the tool call asynchronously.

Awaits coroutine functions directly. Runs sync functions in an executor to avoid blocking the event loop.

Returns

The return value of the tool function.

Raises
ToolExecutionError

If no function is mapped to this tool call.


class ToolCallFunction

ToolCallFunction(
    self,
    name: str,
    arguments: str
)

Function name and arguments within a tool call.

Attributes
name

Name of the function to call.

arguments

JSON string of arguments to pass.

Quick Reference

Methods

Method Description
arguments_parsed() Parse arguments as dict

Methods

def arguments_parsed(self)dict

Parse arguments as dict. Returns empty dict on parse failure.


class ToolResult

ToolResult(
    self,
    tool_call_id: str,
    content: str,
    is_error: bool = False
)

Result of a tool execution.

Added back to the conversation history so the model can incorporate the tool output in its next response.

Attributes
tool_call_id

ID of the tool call this is responding to.

content

The tool's output/result.

is_error

Whether this result represents an error.

Example
>>> # Execute tool and add result
>>> result = execute_tool(tool.function.name, tool.function.arguments_parsed())
>>> # Tool results are added to the conversation automatically during generation

Quick Reference

Methods

Method Description
to_message() Convert to OpenAI message format.

Methods

def to_message(self)dict

Convert to OpenAI message format.


class ToolState

ToolState(
    self,
    status: str,
    input: dict | None = None,
    title: str | None = None,
    output: str | None = None,
    error: str | None = None,
    metadata: dict | None = None,
    time_start: float | None = None,
    time_end: float | None = None
)

Tool execution state for streaming UIs.

Provides state tracking for live UI updates during tool execution.

Attributes
status

Current status (pending, running, completed, error).

title

Human-readable title for UI display.

input

Parsed input arguments (dict, not JSON string).

output

Tool result (when completed).

error

Error message (when error).

metadata

Additional metadata for UI display.

time_start

When execution started (Unix timestamp).

time_end

When execution ended (Unix timestamp).

Example - Streaming updates
>>> # Tool starts
>>> state = ToolState(status="running", title="Searching...", input={"query": "python"})
>>>
>>> # Tool completes
>>> state = ToolState(
...     status="completed",
...     title="Found 10 results",
...     input={"query": "python"},
...     output="1. Python docs\n2. ...",
... )

class ToolStatus

Constants for tool execution status.