Chat
Chat module - Stateful multi-turn conversation sessions
Classes
| Class | Description |
|---|---|
| Chat | Stateful multi-turn chat session. |
| Response | Completed generation result. |
| StreamingResponse | Streaming generation result that yields tokens incrementa... |
| AsyncChat | Async stateful multi-turn chat session. |
| AsyncResponse | Async completed generation result. |
| AsyncStreamingResponse | Async streaming generation result that yields tokens incr... |
| ConversationItems | Read-only view into conversation history. |
| Hook | Base class for generation hooks. |
| HookManager | Hook dispatcher for generation lifecycle events. |
| Token | Single token from a streaming response. |
| Usage | Token usage statistics. |
| Timings | Generation timing breakdown. |
| FinishReason | Constants for generation stop reasons. |
| TokenLogprob | Log probability for a single token. |
| ResponseMetadata | Generation metadata and debug information. |
| ResponseFormat | Structured output format specification. |
| ToolCall | Tool call requested by the model. |
| ToolCallFunction | Function name and arguments within a tool call. |
| ToolResult | Result of a tool execution. |
| ToolState | Tool execution state for streaming UIs. |
| ToolStatus | Constants for tool execution status. |
class Chat
Chat(
self,
model: str | None = None,
client: Client | None = None,
config: GenerationConfig | None = None,
system: str | None = None,
profile: Profile | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
group_id: str | None = None,
ttl_ts: int | None = None,
marker: str = '',
metadata: dict | None = None,
source_doc_id: str | None = None,
prompt_id: str | None = None,
chat_template: str | PromptTemplate | None = None,
storage: Database | None = None,
offline: bool = False,
_defer_session_update: bool = False
)
self,
model: str | None = None,
client: Client | None = None,
config: GenerationConfig | None = None,
system: str | None = None,
profile: Profile | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
group_id: str | None = None,
ttl_ts: int | None = None,
marker: str = '',
metadata: dict | None = None,
source_doc_id: str | None = None,
prompt_id: str | None = None,
chat_template: str | PromptTemplate | None = None,
storage: Database | None = None,
offline: bool = False,
_defer_session_update: bool = False
)
Stateful multi-turn chat session.
Chat is the primary interface for talu. Pass a model string to get a fully autonomous chat that handles everything, or pass a client for efficient multi-user serving.
Separation of Concerns
- Chat manages session state: conversation history, system prompt, templates
- Client manages infrastructure: model loading, GPU layers, API keys, threading
For custom hardware or backend configuration, create a Client first:
client = Client("model", gpu_layers=20, api_key="...")
chat = client.chat(system="You are helpful.")
Concurrency
Not intended for concurrent use. Create one Chat per thread/task. Sharing across threads can interleave message history unpredictably.
Creating multiple Chat instances for the same model is efficient - they share the underlying engine. Only the message history is per-Chat.
Parameters
modelModel to load (HuggingFace ID or local path). Creates a default Client. For custom configuration (GPU layers, API keys, etc.), use Client instead.
clientExisting Client to use (for multi-user serving or custom config).
configDefault GenerationConfig for this session. If provided, these settings are used for all send/stream calls unless overridden.
systemOptional system prompt. Stored as the first message with role="system" (accessible via
messages[0]). This follows the HuggingFace chat template convention where system prompts are part of the messages list, not a separate template variable.profileOptional storage profile. When provided, chat history is persisted under
~/.talu/db/<profile>/. Ifsession_idis not provided, a UUIDv4 session ID is generated automatically.session_idOptional session identifier for this conversation. Used by storage backends to group messages by session. When persisting to TaluDB (when using talu://), session_id is hashed to SESSION_HASH for efficient Jump Reads during session restoration.
parent_session_idOptional parent session identifier for forks.
markerSession marker for storage backends (default: "" = normal/unmarked). Values: "pinned", "archived", "deleted", or "" (normal).
metadataOptional session metadata dict (tags, UI state, notes).
chat_templateCustom chat template to use instead of the model's default. Can be a PromptTemplate object or a template string. If None (default), uses the model's chat_template from tokenizer_config.json.
storageStorage for messages. Defaults to Database(":memory:"). Use Database("talu://<path>") for TaluDB persistence (requires session_id). Cannot be combined with
profile.offlineIf True, disallow network access when resolving model URIs.
Attributes
configThe session's GenerationConfig. This is the single source of truth for generation parameters. Can be read or replaced directly.
messagesList-like access to all messages (including system prompt). The system prompt (if set) appears at index 0 with role="system".
session_idThe session identifier for this conversation, or None.
clientThe Client used for this chat (if any).
routerThe Router used for generation (if any).
chat_templateThe PromptTemplate used for formatting prompts.
Raises
ValidationErrorIf both
modelandclientare provided.MemoryErrorIf Chat creation fails (insufficient memory).
Provide either `model` OR `client`, not both. If neither is provided, Chat works as a lightweight state container (for advanced use).
Configuration Precedence
When calling send/stream, parameters are resolved in this order: 1. **kwargs (e.g., temperature=0.1) - highest priority 2. config parameter (explicit GenerationConfig object) 3. self.config (session default) - lowest priority
Example - Simple chat
>>> chat = Chat("Qwen/Qwen3-0.6B", system="You are helpful.")
>>> response = chat("What is 2+2?")
>>> print(response)
4
Example - Remote backend (use Client for backend config)
>>> client = Client("gpt-4", base_url="http://localhost:8080/v1", api_key="sk-...")
>>> chat = client.chat()
>>> response = chat("Hello!")
Example - Local backend with GPU offload (use Client for hardware config)
>>> client = Client("Qwen/Qwen3-0.6B", gpu_layers=20, num_threads=4)
>>> chat = client.chat()
Example - Multi-turn conversation
>>> response = chat("What is Python?")
>>> response = response.append("What is it used for?")
>>> response = response.append("Give me an example")
Example - Streaming
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> response = chat("Tell me a story", stream=True)
>>> for token in response:
... print(token, end="", flush=True)
Example - Multi-user serving
>>> client = Client("Qwen/Qwen3-0.6B")
>>> user1 = client.chat(system="You are helpful.")
>>> user2 = client.chat(system="You are a pirate.")
>>> response = user1("Hello!")
>>> response = user2("Ahoy!")
Example - Using GenerationConfig
>>> config = GenerationConfig(temperature=0.7, max_tokens=100)
>>> chat = Chat("model", config=config)
>>> print(chat.config.temperature) # 0.7
>>> chat.send("Solve: 2+2") # Uses temp=0.7 automatically
Example - Per-call overrides with kwargs (preferred)
>>> chat = Chat("model", config=GenerationConfig(temperature=0.7))
>>> chat.send("Solve math", temperature=0.1) # Uses 0.1 for this call only
Example - Per-call overrides with config object
>>> chat.send("Complex task", config=GenerationConfig(top_k=20))
Example - Combined overrides (kwargs win)
>>> chat.send("Hello", config=GenerationConfig(temperature=0.5), temperature=0.1)
>>> # Uses temperature=0.1 (kwargs override config parameter)
Example - Message access
>>> chat = Chat(system="You are helpful.")
>>> chat.items[0] # Access system prompt item
MessageItem(role='system', content=[...])
>>> chat.items[0].text # Get text content
'You are helpful.'
>>> chat.clear() # Clear conversation (keeps system prompt)
>>> chat.reset() # Reset everything including system prompt
Quick Reference
Properties
| Name | Type |
|---|---|
chat_template |
PromptTemplate | None |
client |
Client | AsyncClient | None |
items |
ConversationItems |
last_response |
Response | StreamingResponse | AsyncStreamingResponse | None |
max_context_length |
int | None |
messages |
list |
owns_client |
bool |
prompt_id |
str | None |
router |
Router | None |
session_id |
str | None |
source_doc_id |
str | None |
system |
str | None |
Methods
| Method | Description |
|---|---|
__call__() |
Send a message and get a streaming response (ca... |
append() |
Append a message to the conversation. |
append_hidden() |
Append a hidden message to the conversation. |
clear() |
Clear conversation history (keeps system prompt... |
close() |
Close the chat and release resources immediately. |
count_tokens() |
Count tokens in current history or a hypothetic... |
fork() |
Fork this chat to explore alternative conversat... |
from_dict() |
Deserialize from dictionary. |
inherit_tags() |
Copy tags from the prompt document to this conv... |
insert() |
Insert a message at the specified index. |
pop() |
Remove and discard the last message. |
preview_prompt() |
Return the exact formatted prompt that would be... |
regenerate() |
Regenerate the last conversation turn. |
remove() |
Remove message at the specified index. |
reset() |
Reset everything including system prompt. |
send() |
Send a message and get a response (synchronous). |
set_item_parent() |
Set parent_item_id for an item by index. |
set_item_validation_flags() |
Set structured validation flags for an item by ... |
to_dict() |
Serialize to dictionary. |
to_json() |
Get messages as JSON string (from Zig). |
Properties
chat_template: PromptTemplate | None
Get the custom chat template, if any.
client: Client | AsyncClient | None
Get the client used by this chat.
items: ConversationItems
Read-only access to conversation as typed Items.
last_response: Response | StreamingResponse | AsyncStreamingResponse | None
Get the last response from generation.
max_context_length: int | None
Get the model's maximum context length.
messages: list
Read-only view of conversation as standard OpenAI-format messages.
owns_client: bool
True if this Chat owns its Client (standalone mode).
prompt_id: str | None
The prompt document ID for this conversation.
When set, this links the conversation to a prompt/persona document. The prompt document can provide the system prompt content and tags that can be inherited via inherit_tags().
This is stored on the Chat object and used by inherit_tags(). For persistent lineage tracking in session records, use source_doc_id.
router: Router | None
Get the router used by this chat.
session_id: str | None
The session identifier for this conversation.
source_doc_id: str | None
The source document ID for lineage tracking.
Links this conversation to the prompt/persona document that spawned it. Used for tracking which document was used to create the conversation.
system: str | None
Get the system prompt.
Methods
def __call__(
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
stream: bool = True,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → Response | StreamingResponse
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
stream: bool = True,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → Response | StreamingResponse
Send a message and get a streaming response (callable syntax).
This is the primary way to chat. Call the Chat object directly with your message. By default, returns a StreamingResponse for real-time token display. Use stream=False for complete response.
For async usage, use send_async() instead.
Parameters
message- The user's message. Can be:
- A string for simple text messages
- A list of content parts for multimodal input: [{"type": "text", "text": "..."}, {"type": "image", "data": "...", "mime": "image/png"}]
configGeneration configuration override for this call only. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).
streamIf True (default), returns StreamingResponse with tokens arriving incrementally. This provides immediate feedback and matches industry standard for chat interfaces (ChatGPT, Claude, etc.).
If False, returns Response after generation completes. ``response.text`` is immediately available.
Why stream=True is default:
Streaming provides real-time feedback as tokens arrive, which:
- Reduces perceived latency (users see progress immediately)
- Prevents confusion about "hanging" during long generations
- Long generations (10+ seconds) with no output appear broken
- Matches industry standard for chat interfaces
Use stream=True for:
- Interactive applications (CLIs, chat interfaces)
- Long generations where users want real-time feedback
- Applications showing progress indicators
- Reducing perceived latency for user-facing apps
Use stream=False for:
- Batch processing (collect all responses at once)
- Simple scripts where you don't need incremental tokens
- API endpoints returning JSON with full text
- Testing/automation where latency doesn't matter
- Cases requiring deterministic timing
Important: StreamingResponse is single-use. Once exhausted, you cannot iterate again. Access
.textafter iteration for the full accumulated text.on_tokenOptional callback called for each token (streaming only).
response_formatDataclass type or JSON schema dict for structured output. When provided, the model output will be constrained to match the schema. Use
response.parsedto get a hydrated dataclass instance. **kwargs: Individual parameter overrides (temperature, max_tokens, etc.) for this call only. Does NOT modifychat.config.
Returns
StreamingResponse: If stream=True (default). Single-use iterator. Cache tokens during iteration if needed later. Access .text after exhaustion for the full accumulated text. Response: If stream=False. Complete response with .text always available immediately.
Raises
StateErrorIf no router is available (Chat created without model/client).
ValidationErrorIf an unknown generation parameter is passed.
StructuredOutputErrorIf response_format schema setup fails.
Configuration Precedence
Per-call overrides do NOT mutate session state. Priority (high to low):
1. **kwargs (e.g., temperature=0.1) - this call only 2. config parameter - this call only 3. chat.config - session default (unchanged by per-call overrides)
To permanently change session config: chat.config = GenerationConfig(...)
Example - Streaming (default)
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> for token in chat("Tell me a joke"):
... print(token, end="", flush=True)
Example - Non-streaming
>>> response = chat("What is 2+2?", stream=False)
>>> print(response)
4
Example - Structured output
>>> from dataclasses import dataclass
>>> @dataclass
... class Answer:
... value: int
>>> response = chat("What is 2+2?", response_format=Answer, stream=False)
>>> response.parsed.value
4
Example - Per-call override (session unchanged)
>>> chat = Chat("model", config=GenerationConfig(temperature=0.7))
>>> response = chat("Hi", temperature=0.1) # Uses 0.1 for this call
>>> chat.config.temperature # Still 0.7 (unchanged)
0.7
Example - Multi-turn
>>> response = chat("What is 2+2?", stream=False)
>>> response = response.append("Why?") # Inherits stream=False
def append(
self,
role_or_item: str | MessageItem,
content: str | None = None,
hidden: bool = False
) → Self
self,
role_or_item: str | MessageItem,
content: str | None = None,
hidden: bool = False
) → Self
Append a message to the conversation.
Can be called with either
- Two arguments:
append(role, content)- role string and content string - One argument:
append(item)- a MessageItem object
Parameters
role_or_itemEither a role string ("system", "user", "assistant", "developer") or a MessageItem object.
contentMessage content (required when first arg is a role string).
hiddenHide from UI history while keeping in LLM context.
Returns
self, for chaining.
Raises
ValidationErrorIf role is invalid or arguments are malformed.
StateErrorIf append fails.
def clear(self) → Self
Clear conversation history (keeps system prompt and settings).
Returns
self, for chaining.
def close(self) → None
Close the chat and release resources immediately.
If this Chat created its own internal Client (via model="..."), the Client and its Engine are closed, freeing memory.
If this Chat uses a shared Client (via client=...), only the lightweight chat state is freed. The Client stays alive.
Safe to call multiple times.
Example - Explicit cleanup in loops
>>> for model in ["Qwen/0.5B", "Qwen/1.5B", "Qwen/4B"]:
... chat = Chat(model)
... print(chat("Hello"))
... chat.close() # Free memory before loading next model
Example - Context manager (preferred)
>>> with Chat("Qwen/0.5B") as chat:
... print(chat("Hello"))
... # Memory freed automatically here
def count_tokens(self, message: str | None = None) → int
Count tokens in current history or a hypothetical message.
Parameters
messageOptional message to count. If None, counts current history.
Returns
Token count.
Raises
StateErrorIf no model configured.
GenerationErrorIf token counting fails.
def fork(self) → Chat
Fork this chat to explore alternative conversation paths.
Creates an independent copy of the chat with the same history, config, and client reference. Changes to the forked chat do not affect the original.
Returns
New Chat with copied state.
Raises
StateErrorIf message history cannot be copied.
Example
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> response = chat("I have chicken")
>>>
>>> # Fork to try different directions
>>> asian = response.chat.fork()
>>> italian = response.chat.fork()
>>>
>>> asian("Suggest an Asian recipe")
>>> italian("Suggest an Italian recipe")
>>>
>>> # Original unchanged
>>> print(len(chat.items)) # Same as before forking
def from_dict(
cls,
data: dict,
model: str | None = None
) → Self
cls,
data: dict,
model: str | None = None
) → Self
Deserialize from dictionary.
Restores a Chat/AsyncChat from a dict created by to_dict(). Use this to resume conversations from a database or file.
Parameters
dataDict from to_dict(). If
itemsis provided, full ItemRecord data is loaded; otherwise only OpenAI-format messages are restored.modelModel to load (HuggingFace ID or local path).
Returns
New instance with restored state.
Raises
StateErrorIf message loading fails.
def inherit_tags(self) → None
Copy tags from the prompt document to this conversation.
Requires both prompt_id and session_id to be set, and requires TaluDB storage to be configured.
Raises
StateErrorIf chat is closed.
ValidationErrorIf prompt_id or session_id is not set.
IOErrorIf tag inheritance fails.
def insert(
self,
index: int,
role: str,
content: str,
hidden: bool = False
) → Self
self,
index: int,
role: str,
content: str,
hidden: bool = False
) → Self
Insert a message at the specified index.
Parameters
indexPosition to insert at (0-based).
roleMessage role ("system", "user", "assistant", "developer").
contentMessage text content.
hiddenHide from UI history while keeping in LLM context.
Returns
self, for chaining.
Raises
ValidationErrorIf role is invalid.
StateErrorIf index is out of bounds or insert fails.
def pop(self) → Self
Remove and discard the last message.
Returns
self, for chaining.
Raises
StateErrorIf no messages to remove.
def preview_prompt(
self,
add_generation_prompt: bool = True,
config: GenerationConfig | None = None
) → str
self,
add_generation_prompt: bool = True,
config: GenerationConfig | None = None
) → str
Return the exact formatted prompt that would be sent to the model.
This is a read-only inspection tool for debugging template logic or verifying system prompts. It does NOT send anything to the engine or affect the conversation state.
Parameters
add_generation_promptIf True (default), include the assistant turn marker at the end (e.g., "<|im_start|>assistant\n").
configOptional GenerationConfig. If provided and contains a chat_template, that template will be used instead of the session-level or model default template.
Returns
The formatted prompt string.
Raises
StateErrorIf no engine is available and no custom template is set.
def regenerate(
self,
message: str | None = None,
config: GenerationConfig | None = None,
stream: bool = False,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → Response | StreamingResponse
self,
message: str | None = None,
config: GenerationConfig | None = None,
stream: bool = False,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → Response | StreamingResponse
Regenerate the last conversation turn.
This method unwinds the conversation to the previous user message and triggers generation again. Use it to retry a response or edit the last user message.
The operation is atomic: it truncates to the point before the last user message and then sends (either the original or new text). This ensures fresh item IDs and timestamps for auditability.
Parameters
messageOptional new text for the user message. If provided: replaces the last user message with this text. If None: retries with the existing user message text.
configGeneration configuration override.
streamIf True, returns StreamingResponse.
response_formatDataclass type or JSON schema dict for structured output.
**kwargsIndividual parameter overrides (temperature, max_tokens, etc.)
Returns
Response | StreamingResponse The new response from regeneration.
Raises
StateErrorIf no user message exists to regenerate from.
Example
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> chat("Tell me a joke")
>>> # Didn't like the joke? Retry:
>>> chat.regenerate()
>>> # Or edit and retry:
>>> chat.regenerate(message="Tell me a better joke")
>>> # With different parameters:
>>> chat.regenerate(temperature=1.2)
def remove(self, index: int) → Self
Remove message at the specified index.
Parameters
indexIndex of message to remove (0-based).
Returns
self, for chaining.
Raises
StateErrorIf index is out of bounds.
def reset(self) → Self
Reset everything including system prompt.
Returns
self, for chaining.
def send(
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
tools: list[Callable[..., Any]] | None = None,
stream: bool = False,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → Response | StreamingResponse
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
tools: list[Callable[..., Any]] | None = None,
stream: bool = False,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → Response | StreamingResponse
Send a message and get a response (synchronous).
This is the explicit sync method. For streaming default, use chat(). For async, use send_async().
Parameters
message- The user's message. Can be:
- A string for simple text messages
- A list of content parts for multimodal input: [{"type": "text", "text": "..."}, {"type": "image", "data": "...", "mime": "image/png"}]
configGeneration configuration override. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).
toolsOptional list of @tool-decorated functions to enable tool calling.
streamIf True, returns StreamingResponse. If False (default), returns Response.
on_tokenOptional callback called for each token (streaming only).
response_formatDataclass type or JSON schema dict for structured output. **kwargs: Individual parameter overrides (temperature, max_tokens, etc.).
Returns
Response: If stream=False (default). StreamingResponse: If stream=True.
Raises
StateErrorIf no router is available (Chat created without model/client).
ValidationErrorIf an unknown generation parameter is passed.
StructuredOutputErrorIf response_format schema setup fails.
Example
>>> response = chat.send("What is 2+2?")
>>> print(response)
4
>>> response = response.append("Why?") # Continues with same mode
def set_item_parent(
self,
item_index: int,
parent_item_id: int | None
) → None
self,
item_index: int,
parent_item_id: int | None
) → None
Set parent_item_id for an item by index.
Raises
StateErrorIf the operation fails.
def set_item_validation_flags(
self,
item_index: int,
json_valid: bool,
schema_valid: bool,
repaired: bool = False
) → None
self,
item_index: int,
json_valid: bool,
schema_valid: bool,
repaired: bool = False
) → None
Set structured validation flags for an item by index.
Use this after structured parsing/validation to mark JSON/schema validity.
Raises
StateErrorIf the operation fails.
def to_dict(self) → dict
Serialize to dictionary.
def to_json(self) → str
Get messages as JSON string (from Zig).
Returns
JSON string of messages array in OpenAI Completions format. This is an interchange format and does not include storage-only metadata.
class Response
Response(
self,
text: str = '',
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: Chat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = False,
_msg_index: int | None = None,
_content: list[ContentPart] | None = None,
_prompt: str | None = None
)
self,
text: str = '',
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: Chat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = False,
_msg_index: int | None = None,
_content: list[ContentPart] | None = None,
_prompt: str | None = None
)
Completed generation result.
Wraps the result of a non-streaming generation. Behaves like a string for simple use but exposes rich metadata when needed.
The .text property contains the complete generated text, available immediately without iteration. Convert to string with str(response) or access directly via response.text.
Attributes
textThe generated text content (always available immediately).
tokensList of generated token IDs.
finish_reasonWhy generation stopped (eos_token, length, stop_sequence).
usageToken usage statistics.
timingsGeneration timing breakdown.
modelModel identifier that generated this response.
logprobsToken log probabilities (if requested).
Example - Casual use
>>> response = chat("Hello!")
>>> print(response) # Works like a string
Hi there!
>>> if "hello" in response.lower():
... print("Greeting detected")
Example - Power user
>>> response = chat("Hello!")
>>> print(f"Used {response.usage.total_tokens} tokens")
>>> print(f"Finished due to: {response.finish_reason}")
>>> print(f"Model: {response.model}")
Quick Reference
Properties
| Name | Type |
|---|---|
chat |
Chat | None |
content |
list[ContentPart] |
finish_reason |
str |
logprobs |
list[TokenLogprob] | None |
model |
str | None |
parsed |
Any |
prompt |
str | None |
text |
str |
timings |
Timings | None |
tokens |
list[int] |
tool_calls |
list[ToolCall] | None |
usage |
Usage | None |
Methods
| Method | Description |
|---|---|
append() |
Continue the conversation with a follow-up mess... |
endswith() |
Check if text ends with suffix. |
lower() |
Return text in lowercase. |
replace() |
Replace occurrences in text. |
split() |
Split text. |
startswith() |
Check if text starts with prefix. |
strip() |
Return text with leading/trailing chars removed. |
submit_tool_result() |
Submit a tool result and continue generation. |
to_dict() |
Convert response to a JSON-serializable diction... |
upper() |
Return text in uppercase. |
Properties
chat: Chat | None
The Chat that generated this response.
content: list[ContentPart]
Structured content parts for multimodal output symmetry.
Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).
This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.
Returns
List of content parts (currently OutputText for text responses).
Example
>>> response = chat("Hello!")
>>> for part in response.content:
... if part.type == ContentType.OUTPUT_TEXT:
... print(part.text)
Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.
finish_reason: str
Why generation stopped.
logprobs: list[TokenLogprob] | None
Token log probabilities (if requested).
model: str | None
Model identifier that generated this response.
parsed: Any
Parse and validate the response against the response_format schema.
If a response_format was provided during generation, this property parses the response text as JSON and validates/hydrates it into the specified type (dataclass or Pydantic model).
Returns
The parsed and validated response object, or None if no response_format was specified.
Raises
IncompleteJSONErrorIf finish_reason is "length" and JSON is malformed.
json.JSONDecodeErrorIf the response text is not valid JSON.
SchemaValidationErrorIf the parsed data doesn't match the schema.
prompt: str | None
The fully rendered prompt sent to the model (audit trail).
Contains the exact string that was fed to the model engine after all templating, system prompt injection, and formatting was applied. Useful for debugging template issues and understanding exactly what the model saw.
Returns
The rendered prompt string, or None if not available.
Example
>>> response = chat("Hello!")
>>> print(response.prompt)
<|im_start|>system
You are a helpful assistant.
<|im_end|>
<|im_start|>user
Hello!
<|im_end|>
<|im_start|>assistant
Only available for responses generated through Chat. May be None for responses from remote APIs or when prompt wasn't captured.
text: str
The generated text content.
timings: Timings | None
Generation timing breakdown.
tokens: list[int]
List of generated token IDs.
tool_calls: list[ToolCall] | None
Tool calls requested by the model (if any).
usage: Usage | None
Token usage statistics.
Methods
def append(
self,
message: str,
kwargs: Any
) → Response | StreamingResponse
self,
message: str,
kwargs: Any
) → Response | StreamingResponse
Continue the conversation with a follow-up message (sync).
This is the primary way to have multi-turn conversations. The append uses the same Chat that generated this response, maintaining context.
Auto-Fork Behavior: If the conversation has moved past this response (i.e., more messages were added after this response was generated), append() automatically forks the conversation and truncates it back to this point before sending the new message. This enables intuitive branching where you can append to any previous response without worrying about conversation state.
The append automatically inherits streaming mode from the original response.
Parameters
messageThe follow-up message to send. **kwargs: Generation parameters (temperature, max_tokens, etc.)
Returns
Response if original was non-streaming, StreamingResponse if streaming.
Raises
StateErrorIf this response has no associated Chat.
Example - Linear conversation
>>> r1 = chat("What is 2+2?")
>>> r2 = r1.append("Why?") # Continues normally
>>> r3 = r2.append("Thanks!") # Continues normally
Example - Branching
>>> r1 = chat("Idea 1")
>>> r2 = r1.append("Critique it") # chat has [Idea 1, Critique]
>>> r3 = r1.append("Expand on it") # Auto-forks! r3.chat is new
>>> # Original chat unchanged, r3.chat has [Idea 1, Expand]
def endswith(self, suffix: str) → bool
Check if text ends with suffix.
def lower(self) → str
Return text in lowercase.
def replace(
self,
old: str,
new: str,
count: int = -1
) → str
self,
old: str,
new: str,
count: int = -1
) → str
Replace occurrences in text.
def split(
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
Split text.
def startswith(self, prefix: str) → bool
Check if text starts with prefix.
def strip(self, chars: str | None = None) → str
Return text with leading/trailing chars removed.
def submit_tool_result(
self,
tool_call_id: str,
result: Any
) → Response
self,
tool_call_id: str,
result: Any
) → Response
Submit a tool result and continue generation.
When the model requests tool calls (via response.tool_calls), execute them and submit the results back using this method. The model will then continue generation with the tool results in context.
Parameters
tool_call_idThe ID from the tool call (tool_call.id).
resultThe result to send back (will be JSON serialized if not str).
Returns
New Response from continued generation.
Raises
StateErrorIf no Chat session is attached.
Example
>>> response = chat("What's the weather?", tools=[get_weather])
>>> while response.tool_calls:
... for call in response.tool_calls:
... result = call.execute()
... response = response.submit_tool_result(call.id, result)
>>> print(response)
def to_dict(self) → dict[str, Any]
Convert response to a JSON-serializable dictionary.
This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.
Returns
Dict with text, finish_reason, model, and usage (if available).
Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
... response = await chat(message)
... return response.to_dict() # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict()) # Works!
Example - Custom response structure
>>> result = {
... "success": True,
... "data": response.to_dict(),
... }
def upper(self) → str
Return text in uppercase.
class StreamingResponse
StreamingResponse(
self,
stream_iterator: Iterator,
on_token: Callable[[str], None] | None = None,
on_complete: Callable[[str], None] | None = None,
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: Chat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = True,
_hooks: HookManager | None = None,
_generation_start_time: float | None = None,
_prompt: str | None = None
)
self,
stream_iterator: Iterator,
on_token: Callable[[str], None] | None = None,
on_complete: Callable[[str], None] | None = None,
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: Chat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = True,
_hooks: HookManager | None = None,
_generation_start_time: float | None = None,
_prompt: str | None = None
)
Streaming generation result that yields tokens incrementally.
Returned when calling chat(stream=True). Iterate over it to receive tokens in real-time. Text accumulates in .text as you iterate.
Streaming Behavior
StreamingResponse objects are single-use iterators. Once exhausted, you cannot iterate again. If you need the full text later, cache it during iteration:
>>> response = chat("Hello", stream=True)
>>> full_text = "".join(response) # Cache during iteration
>>> print(full_text)
Calling len(response) or accessing response.text after the stream is exhausted returns the cached full text. Iterating multiple times on the same StreamingResponse will yield no tokens on subsequent iterations.
Concurrency
Single-consumer. Do not iterate from multiple threads/tasks.
Attributes
textThe accumulated text (grows during iteration, always available after).
tokensList of generated token IDs (populated after iteration).
finish_reasonWhy generation stopped (available after iteration).
usageToken usage statistics (available after iteration).
timingsGeneration timing breakdown (available after iteration).
modelModel identifier that generated this response.
Example
>>> response = chat("Tell me a joke", stream=True)
>>> for token in response:
... print(token, end="", flush=True)
>>> print()
>>> print(f"Full text: {response.text}")
>>> print(f"Tokens used: {response.usage.total_tokens}")
Example - With callback
>>> def on_token(t): print(t, end="")
>>> response = chat("Hello", stream=True, on_token=on_token)
>>> for _ in response: pass # Drain to trigger callbacks
After iteration completes, you can access .text for the full accumulated text and .usage/.timings for metadata.
Quick Reference
Properties
| Name | Type |
|---|---|
chat |
Chat | None |
content |
list[ContentPart] |
finish_reason |
str |
logprobs |
list[TokenLogprob] | None |
model |
str | None |
prompt |
str | None |
text |
str |
timings |
Timings | None |
tokens |
list[int] |
tool_calls |
list[ToolCall] | None |
usage |
Usage | None |
Methods
| Method | Description |
|---|---|
append() |
Continue the conversation with a follow-up mess... |
endswith() |
Check if text ends with suffix. |
lower() |
Return text in lowercase. |
replace() |
Replace occurrences in text. |
split() |
Split text. |
startswith() |
Check if text starts with prefix. |
strip() |
Return text with leading/trailing chars removed. |
to_dict() |
Convert response to a JSON-serializable diction... |
upper() |
Return text in uppercase. |
Properties
chat: Chat | None
The Chat that generated this response.
content: list[ContentPart]
Structured content parts for multimodal output symmetry.
Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).
This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.
Returns
List of content parts (currently OutputText for text responses).
Example
>>> response = chat("Hello!")
>>> for part in response.content:
... if part.type == ContentType.OUTPUT_TEXT:
... print(part.text)
Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.
finish_reason: str
Why generation stopped.
logprobs: list[TokenLogprob] | None
Token log probabilities (if requested).
model: str | None
Model identifier that generated this response.
prompt: str | None
The fully rendered prompt (available after iteration completes).
For streaming responses, the prompt is captured after iteration finishes since messages are added during streaming. Access this property after consuming the stream.
Returns
The rendered prompt string, or None if iteration hasn't completed or the prompt couldn't be captured.
text: str
The generated text content.
For StreamingResponse, accessing this property will auto-drain the stream if it hasn't been consumed yet. This ensures that `.text` always returns the complete generated text, regardless of whether the caller explicitly iterated over the response.
Returns
The full generated text content.
timings: Timings | None
Generation timing breakdown.
tokens: list[int]
List of generated token IDs.
tool_calls: list[ToolCall] | None
Tool calls requested by the model (if any).
usage: Usage | None
Token usage statistics.
Methods
def append(
self,
message: str,
kwargs: Any
) → StreamingResponse
self,
message: str,
kwargs: Any
) → StreamingResponse
Continue the conversation with a follow-up message.
Returns a StreamingResponse (inherits streaming mode from this response). See Response.append() for full documentation including auto-fork behavior.
Parameters
messageThe follow-up message text. **kwargs: Generation overrides (temperature, max_tokens, etc.).
Raises
StateErrorIf this response has no associated Chat.
def endswith(self, suffix: str) → bool
Check if text ends with suffix.
def lower(self) → str
Return text in lowercase.
def replace(
self,
old: str,
new: str,
count: int = -1
) → str
self,
old: str,
new: str,
count: int = -1
) → str
Replace occurrences in text.
def split(
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
Split text.
def startswith(self, prefix: str) → bool
Check if text starts with prefix.
def strip(self, chars: str | None = None) → str
Return text with leading/trailing chars removed.
def to_dict(self) → dict[str, Any]
Convert response to a JSON-serializable dictionary.
This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.
Returns
Dict with text, finish_reason, model, and usage (if available).
Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
... response = await chat(message)
... return response.to_dict() # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict()) # Works!
Example - Custom response structure
>>> result = {
... "success": True,
... "data": response.to_dict(),
... }
def upper(self) → str
Return text in uppercase.
class AsyncChat
AsyncChat(
self,
model: str | None = None,
client: AsyncClient | None = None,
config: GenerationConfig | None = None,
system: str | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
group_id: str | None = None,
ttl_ts: int | None = None,
marker: str = '',
metadata: dict | None = None,
source_doc_id: str | None = None,
prompt_id: str | None = None,
chat_template: str | PromptTemplate | None = None,
storage: Database | None = None,
offline: bool = False,
_defer_session_update: bool = False
)
self,
model: str | None = None,
client: AsyncClient | None = None,
config: GenerationConfig | None = None,
system: str | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
group_id: str | None = None,
ttl_ts: int | None = None,
marker: str = '',
metadata: dict | None = None,
source_doc_id: str | None = None,
prompt_id: str | None = None,
chat_template: str | PromptTemplate | None = None,
storage: Database | None = None,
offline: bool = False,
_defer_session_update: bool = False
)
Async stateful multi-turn chat session.
AsyncChat is the async equivalent of Chat. Use it for building async applications (FastAPI, aiohttp, etc.) where you need non-blocking generation operations.
All generation methods (send(), __call__()) are async and must be awaited.
Separation of Concerns
- AsyncChat manages session state: conversation history, system prompt, templates
- AsyncClient manages infrastructure: model loading, GPU layers, API keys, threading
For custom hardware or backend configuration, create an AsyncClient first:
async with AsyncClient("model", gpu_layers=20, api_key="...") as client:
chat = client.chat(system="You are helpful.")
Architecture
AsyncChat shares the same Zig backend as Chat. Model weights are cached globally, so creating AsyncChat for the same model as an existing Chat shares memory efficiently.
Concurrency
Safe to share across asyncio tasks. Not thread-safe across OS threads. Each task should maintain its own conversation flow to avoid interleaving.
Parameters
modelModel to load (HuggingFace ID or local path). Creates a default AsyncClient. For custom configuration (GPU layers, API keys, etc.), use AsyncClient instead.
clientExisting AsyncClient to use (for multi-user serving or custom config).
configDefault GenerationConfig for this session.
systemOptional system prompt.
session_idOptional session identifier for this conversation.
parent_session_idOptional parent session identifier for forks.
markerSession marker for storage backends (default: "" = normal/unmarked). Values: "pinned", "archived", "deleted", or "" (normal).
metadataOptional session metadata dict (tags, UI state, notes).
chat_templateCustom chat template to use.
storageStorage for messages. Use Database("talu://<path>") for TaluDB persistence (requires session_id).
offlineIf True, disallow network access when resolving model URIs.
Example - Basic async usage
>>> chat = AsyncChat("Qwen/Qwen3-0.6B", system="You are helpful.")
>>> response = await chat("What is 2+2?")
>>> print(response)
Example - Remote backend (use AsyncClient for backend config)
>>> async with AsyncClient("gpt-4", base_url="http://localhost:8080/v1", api_key="sk-...") as client:
... chat = client.chat()
... response = await chat("Hello!")
Example - Multi-turn async conversation
>>> response = await chat("Hello!")
>>> response = await response.append("Tell me more")
Example - Async streaming
>>> chat = AsyncChat("Qwen/Qwen3-0.6B")
>>> response = await chat("Tell me a story", stream=True)
>>> async for chunk in response:
... print(chunk, end="", flush=True)
Raises ------ ValidationError: If both model and client are provided. MemoryError: If AsyncChat creation fails (insufficient memory).
Example - Multi-user async serving
>>> async with AsyncClient("Qwen/Qwen3-0.6B") as client:
... user1 = client.chat(system="You are helpful.")
... user2 = client.chat(system="You are a pirate.")
... response = await user1("Hello!")
... response = await user2("Ahoy!")
Quick Reference
Properties
| Name | Type |
|---|---|
chat_template |
PromptTemplate | None |
client |
Client | AsyncClient | None |
items |
ConversationItems |
last_response |
Response | StreamingResponse | AsyncStreamingResponse | None |
max_context_length |
int | None |
messages |
list |
owns_client |
bool |
prompt_id |
str | None |
router |
Router | None |
session_id |
str | None |
source_doc_id |
str | None |
system |
str | None |
Methods
| Method | Description |
|---|---|
__call__() |
Send a message and get an async streaming respo... |
append() |
Append a message to the conversation. |
append_hidden() |
Append a hidden message to the conversation. |
clear() |
Clear conversation history (keeps system prompt... |
close() |
Close the chat and release resources immediately. |
count_tokens() |
Count tokens in current history or a hypothetic... |
fork() |
Fork this chat to explore alternative conversat... |
from_dict() |
Deserialize from dictionary. |
inherit_tags() |
Copy tags from the prompt document to this conv... |
insert() |
Insert a message at the specified index. |
pop() |
Remove and discard the last message. |
preview_prompt() |
Return the exact formatted prompt that would be... |
regenerate() |
Regenerate the last conversation turn. |
remove() |
Remove message at the specified index. |
reset() |
Reset everything including system prompt. |
send() |
Send a message and get a response (async, non-s... |
set_item_parent() |
Set parent_item_id for an item by index. |
set_item_validation_flags() |
Set structured validation flags for an item by ... |
to_dict() |
Serialize to dictionary. |
to_json() |
Get messages as JSON string (from Zig). |
Properties
chat_template: PromptTemplate | None
Get the custom chat template, if any.
client: Client | AsyncClient | None
Get the client used by this chat.
items: ConversationItems
Read-only access to conversation as typed Items.
last_response: Response | StreamingResponse | AsyncStreamingResponse | None
Get the last response from generation.
max_context_length: int | None
Get the model's maximum context length.
messages: list
Read-only view of conversation as standard OpenAI-format messages.
owns_client: bool
True if this Chat owns its Client (standalone mode).
prompt_id: str | None
The prompt document ID for this conversation.
When set, this links the conversation to a prompt/persona document. The prompt document can provide the system prompt content and tags that can be inherited via inherit_tags().
This is stored on the Chat object and used by inherit_tags(). For persistent lineage tracking in session records, use source_doc_id.
router: Router | None
Get the router used by this chat.
session_id: str | None
The session identifier for this conversation.
source_doc_id: str | None
The source document ID for lineage tracking.
Links this conversation to the prompt/persona document that spawned it. Used for tracking which document was used to create the conversation.
system: str | None
Get the system prompt.
Methods
def __call__(
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
stream: bool = True,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
stream: bool = True,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
Send a message and get an async streaming response (callable syntax).
This is the primary async way to chat. Call the AsyncChat object directly with your message. By default, returns an AsyncStreamingResponse.
Parameters
messageThe user's message.
configGeneration configuration override. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).
streamIf True (default), returns AsyncStreamingResponse.
on_tokenOptional callback called for each token.
response_formatDataclass type or JSON schema dict for structured output. **kwargs: Individual parameter overrides.
Returns
AsyncStreamingResponse: If stream=True (default). AsyncResponse: If stream=False.
Example
>>> response = await chat("Tell me a joke")
>>> async for token in response:
... print(token, end="", flush=True)
def append(
self,
role_or_item: str | MessageItem,
content: str | None = None,
hidden: bool = False
) → Self
self,
role_or_item: str | MessageItem,
content: str | None = None,
hidden: bool = False
) → Self
Append a message to the conversation.
Can be called with either
- Two arguments:
append(role, content)- role string and content string - One argument:
append(item)- a MessageItem object
Parameters
role_or_itemEither a role string ("system", "user", "assistant", "developer") or a MessageItem object.
contentMessage content (required when first arg is a role string).
hiddenHide from UI history while keeping in LLM context.
Returns
self, for chaining.
Raises
ValidationErrorIf role is invalid or arguments are malformed.
StateErrorIf append fails.
def clear(self) → Self
Clear conversation history (keeps system prompt and settings).
Returns
self, for chaining.
def close(self) → None
Close the chat and release resources immediately.
If this AsyncChat created its own internal AsyncClient (via model="..."), the Client and its Engine are closed, freeing memory.
If this AsyncChat uses a shared AsyncClient (via client=...), only the lightweight chat state is freed. The Client stays alive.
Safe to call multiple times.
Example - Explicit cleanup in loops
>>> for model in ["Qwen/0.5B", "Qwen/1.5B", "Qwen/4B"]:
... chat = AsyncChat(model)
... print(await chat("Hello"))
... await chat.close() # Free memory before loading next model
Example - Context manager (preferred)
>>> async with AsyncChat("Qwen/0.5B") as chat:
... print(await chat("Hello"))
... # Memory freed automatically here
def count_tokens(self, message: str | None = None) → int
Count tokens in current history or a hypothetical message.
Parameters
messageOptional message to count. If None, counts current history.
Returns
Token count.
Raises
StateErrorIf no model configured.
GenerationErrorIf token counting fails.
def fork(self) → AsyncChat
Fork this chat to explore alternative conversation paths.
Returns
New AsyncChat with copied state.
Raises
StateErrorIf message copying fails.
def from_dict(
cls,
data: dict,
model: str | None = None
) → Self
cls,
data: dict,
model: str | None = None
) → Self
Deserialize from dictionary.
Restores a Chat/AsyncChat from a dict created by to_dict(). Use this to resume conversations from a database or file.
Parameters
dataDict from to_dict(). If
itemsis provided, full ItemRecord data is loaded; otherwise only OpenAI-format messages are restored.modelModel to load (HuggingFace ID or local path).
Returns
New instance with restored state.
Raises
StateErrorIf message loading fails.
def inherit_tags(self) → None
Copy tags from the prompt document to this conversation.
Requires both prompt_id and session_id to be set, and requires TaluDB storage to be configured.
Raises
StateErrorIf chat is closed.
ValidationErrorIf prompt_id or session_id is not set.
IOErrorIf tag inheritance fails.
def insert(
self,
index: int,
role: str,
content: str,
hidden: bool = False
) → Self
self,
index: int,
role: str,
content: str,
hidden: bool = False
) → Self
Insert a message at the specified index.
Parameters
indexPosition to insert at (0-based).
roleMessage role ("system", "user", "assistant", "developer").
contentMessage text content.
hiddenHide from UI history while keeping in LLM context.
Returns
self, for chaining.
Raises
ValidationErrorIf role is invalid.
StateErrorIf index is out of bounds or insert fails.
def pop(self) → Self
Remove and discard the last message.
Returns
self, for chaining.
Raises
StateErrorIf no messages to remove.
def preview_prompt(
self,
add_generation_prompt: bool = True,
config: GenerationConfig | None = None
) → str
self,
add_generation_prompt: bool = True,
config: GenerationConfig | None = None
) → str
Return the exact formatted prompt that would be sent to the model.
This is a read-only inspection tool for debugging template logic or verifying system prompts. It does NOT send anything to the engine or affect the conversation state.
Parameters
add_generation_promptIf True (default), include the assistant turn marker at the end (e.g., "<|im_start|>assistant\n").
configOptional GenerationConfig. If provided and contains a chat_template, that template will be used instead of the session-level or model default template.
Returns
The formatted prompt string.
Raises
StateErrorIf no engine is available and no custom template is set.
def regenerate(
self,
message: str | None = None,
config: GenerationConfig | None = None,
stream: bool = False,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
self,
message: str | None = None,
config: GenerationConfig | None = None,
stream: bool = False,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
Regenerate the last conversation turn.
This method unwinds the conversation to the previous user message and triggers generation again. Use it to retry a response or edit the last user message.
The operation is atomic: it truncates to the point before the last user message and then sends (either the original or new text). This ensures fresh item IDs and timestamps for auditability.
Parameters
messageOptional new text for the user message. If provided: replaces the last user message with this text. If None: retries with the existing user message text.
configGeneration configuration override.
streamIf True, returns AsyncStreamingResponse.
response_formatDataclass type or JSON schema dict for structured output.
**kwargsIndividual parameter overrides (temperature, max_tokens, etc.)
Returns
AsyncResponse | AsyncStreamingResponse The new response from regeneration.
Raises
StateErrorIf no user message exists to regenerate from.
Example
>>> chat = AsyncChat("Qwen/Qwen3-0.6B")
>>> await chat("Tell me a joke")
>>> # Didn't like the joke? Retry:
>>> await chat.regenerate()
>>> # Or edit and retry:
>>> await chat.regenerate(message="Tell me a better joke")
def remove(self, index: int) → Self
Remove message at the specified index.
Parameters
indexIndex of message to remove (0-based).
Returns
self, for chaining.
Raises
StateErrorIf index is out of bounds.
def reset(self) → Self
Reset everything including system prompt.
Returns
self, for chaining.
def send(
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
tools: list[Callable[..., Any]] | None = None,
stream: bool = False,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
self,
message: str | list[dict] | MessageItem | list[MessageItem],
config: GenerationConfig | None = None,
tools: list[Callable[..., Any]] | None = None,
stream: bool = False,
on_token: Callable[[str], None] | None = None,
response_format: type | dict | Grammar | None = None,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
Send a message and get a response (async, non-streaming by default).
Parameters
messageThe user's message.
configGeneration configuration override. Includes structured output settings (schema_strategy, inject_schema_prompt, allow_thinking, max_thinking_tokens).
toolsOptional list of @tool-decorated functions to enable tool calling.
streamIf True, returns AsyncStreamingResponse. If False (default), AsyncResponse.
on_tokenOptional callback called for each token.
response_formatDataclass type or JSON schema dict for structured output. **kwargs: Individual parameter overrides.
Returns
AsyncResponse: If stream=False (default). AsyncStreamingResponse: If stream=True.
Raises
StateErrorIf no router is available (AsyncChat created without model/client).
ValidationErrorIf an unknown generation parameter is passed.
StructuredOutputErrorIf response_format schema setup fails.
Example
>>> response = await chat.send("What is 2+2?")
>>> print(response)
4
>>> response = await response.append("Why?")
def set_item_parent(
self,
item_index: int,
parent_item_id: int | None
) → None
self,
item_index: int,
parent_item_id: int | None
) → None
Set parent_item_id for an item by index.
Raises
StateErrorIf the operation fails.
def set_item_validation_flags(
self,
item_index: int,
json_valid: bool,
schema_valid: bool,
repaired: bool = False
) → None
self,
item_index: int,
json_valid: bool,
schema_valid: bool,
repaired: bool = False
) → None
Set structured validation flags for an item by index.
Use this after structured parsing/validation to mark JSON/schema validity.
Raises
StateErrorIf the operation fails.
def to_dict(self) → dict
Serialize to dictionary.
def to_json(self) → str
Get messages as JSON string (from Zig).
Returns
JSON string of messages array in OpenAI Completions format. This is an interchange format and does not include storage-only metadata.
class AsyncResponse
AsyncResponse(
self,
text: str = '',
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: AsyncChat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = False,
_content: list[ContentPart] | None = None,
_prompt: str | None = None
)
self,
text: str = '',
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: AsyncChat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = False,
_content: list[ContentPart] | None = None,
_prompt: str | None = None
)
Async completed generation result.
Returned by AsyncChat for non-streaming generation. Contains the complete generated text and metadata. Behaves like a string for simple use but exposes rich metadata when needed.
The append() method is async and must be awaited.
Attributes
textThe generated text content.
tokensList of generated token IDs.
finish_reasonWhy generation stopped (eos_token, length, stop_sequence).
usageToken usage statistics.
timingsGeneration timing breakdown.
modelModel identifier that generated this response.
logprobsToken log probabilities (if requested).
Example
>>> response = await chat.send("Hello!")
>>> print(response) # Works like a string
>>> print(f"Used {response.usage.total_tokens} tokens")
Example - Multi-turn
>>> response = await chat.send("What is 2+2?")
>>> response = await response.append("Why?")
>>> response = await response.append("Are you sure?")
Quick Reference
Properties
| Name | Type |
|---|---|
chat |
Chat | None |
content |
list[ContentPart] |
finish_reason |
str |
logprobs |
list[TokenLogprob] | None |
model |
str | None |
parsed |
Any |
prompt |
str | None |
text |
str |
timings |
Timings | None |
tokens |
list[int] |
tool_calls |
list[ToolCall] | None |
usage |
Usage | None |
Methods
| Method | Description |
|---|---|
append() |
Continue the conversation with a follow-up mess... |
endswith() |
Check if text ends with suffix. |
lower() |
Return text in lowercase. |
replace() |
Replace occurrences in text. |
split() |
Split text. |
startswith() |
Check if text starts with prefix. |
strip() |
Return text with leading/trailing chars removed. |
submit_tool_result() |
Submit a tool result and continue generation (a... |
to_dict() |
Convert response to a JSON-serializable diction... |
upper() |
Return text in uppercase. |
Properties
chat: Chat | None
The Chat that generated this response.
content: list[ContentPart]
Structured content parts for multimodal output symmetry.
Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).
This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.
Returns
List of content parts (currently OutputText for text responses).
Example
>>> response = chat("Hello!")
>>> for part in response.content:
... if part.type == ContentType.OUTPUT_TEXT:
... print(part.text)
Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.
finish_reason: str
Why generation stopped.
logprobs: list[TokenLogprob] | None
Token log probabilities (if requested).
model: str | None
Model identifier that generated this response.
parsed: Any
Parse and validate the response against the response_format schema.
If a response_format was provided during generation, this property parses the response text as JSON and validates/hydrates it into the specified type (dataclass or Pydantic model).
Returns
The parsed and validated response object, or None if no response_format was specified.
Raises
IncompleteJSONErrorIf finish_reason is "length" and JSON is malformed.
json.JSONDecodeErrorIf the response text is not valid JSON.
SchemaValidationErrorIf the parsed data doesn't match the schema.
prompt: str | None
The fully rendered prompt sent to the model (audit trail).
Contains the exact string that was fed to the model engine after all templating, system prompt injection, and formatting was applied. Useful for debugging template issues and understanding exactly what the model saw.
Returns
The rendered prompt string, or None if not available.
Example
>>> response = chat("Hello!")
>>> print(response.prompt)
<|im_start|>system
You are a helpful assistant.
<|im_end|>
<|im_start|>user
Hello!
<|im_end|>
<|im_start|>assistant
Only available for responses generated through Chat. May be None for responses from remote APIs or when prompt wasn't captured.
text: str
The generated text content.
timings: Timings | None
Generation timing breakdown.
tokens: list[int]
List of generated token IDs.
tool_calls: list[ToolCall] | None
Tool calls requested by the model (if any).
usage: Usage | None
Token usage statistics.
Methods
def append(
self,
message: str,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
self,
message: str,
kwargs: Any
) → AsyncResponse | AsyncStreamingResponse
Continue the conversation with a follow-up message (async).
This is the async way to have multi-turn conversations. The append uses the same AsyncChat that generated this response. Must be awaited.
Auto-Fork Behavior: If the conversation has moved past this response, append() automatically forks the conversation and truncates it back to this point before sending the new message. See Response.append() for details.
The append automatically inherits streaming mode from the original response.
Parameters
messageThe follow-up message to send. **kwargs: Generation parameters (temperature, max_tokens, etc.)
Returns
AsyncResponse if original was non-streaming, AsyncStreamingResponse if streaming.
Raises
StateErrorIf this response has no associated AsyncChat.
Example
>>> response = await chat.send("What is 2+2?")
>>> response = await response.append("Why?")
>>> response = await response.append("Are you sure?")
def endswith(self, suffix: str) → bool
Check if text ends with suffix.
def lower(self) → str
Return text in lowercase.
def replace(
self,
old: str,
new: str,
count: int = -1
) → str
self,
old: str,
new: str,
count: int = -1
) → str
Replace occurrences in text.
def split(
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
Split text.
def startswith(self, prefix: str) → bool
Check if text starts with prefix.
def strip(self, chars: str | None = None) → str
Return text with leading/trailing chars removed.
def submit_tool_result(
self,
tool_call_id: str,
result: Any
) → AsyncResponse
self,
tool_call_id: str,
result: Any
) → AsyncResponse
Submit a tool result and continue generation (async).
Parameters
tool_call_idThe ID from the tool call (tool_call.id).
resultThe result to send back (will be JSON serialized if not str).
Returns
New AsyncResponse from continued generation.
Raises
StateErrorIf no AsyncChat session is attached.
def to_dict(self) → dict[str, Any]
Convert response to a JSON-serializable dictionary.
This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.
Returns
Dict with text, finish_reason, model, and usage (if available).
Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
... response = await chat(message)
... return response.to_dict() # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict()) # Works!
Example - Custom response structure
>>> result = {
... "success": True,
... "data": response.to_dict(),
... }
def upper(self) → str
Return text in uppercase.
class AsyncStreamingResponse
AsyncStreamingResponse(
self,
async_stream_iterator: AsyncIterator,
on_token: Callable[[str], None] | None = None,
on_complete: Callable[[str], None] | None = None,
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: AsyncChat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = True,
_hooks: HookManager | None = None,
_generation_start_time: float | None = None,
_prompt: str | None = None
)
self,
async_stream_iterator: AsyncIterator,
on_token: Callable[[str], None] | None = None,
on_complete: Callable[[str], None] | None = None,
tokens: list[int] | None = None,
finish_reason: str | None = None,
usage: Usage | None = None,
timings: Timings | None = None,
model: str | None = None,
logprobs: list[TokenLogprob] | None = None,
tool_calls: list[ToolCall] | None = None,
chat: AsyncChat | None = None,
metadata: ResponseMetadata | None = None,
_response_format: type | dict | Grammar | None = None,
_stream_mode: bool = True,
_hooks: HookManager | None = None,
_generation_start_time: float | None = None,
_prompt: str | None = None
)
Async streaming generation result that yields tokens incrementally.
Returned when calling chat.send(stream=True) on AsyncChat. Use async for to receive tokens in real-time. Text accumulates in .text as you iterate.
Concurrency
Single-consumer. Do not iterate from multiple tasks.
Attributes
textThe accumulated text (grows during iteration).
tokensList of generated token IDs (populated after iteration).
finish_reasonWhy generation stopped (available after iteration).
usageToken usage statistics (available after iteration).
timingsGeneration timing breakdown (available after iteration).
modelModel identifier that generated this response.
Example
>>> response = await chat.send("Tell me a joke", stream=True)
>>> async for token in response:
... print(token, end="", flush=True)
>>> print()
>>> print(f"Full text: {response.text}")
Quick Reference
Properties
| Name | Type |
|---|---|
chat |
Chat | None |
content |
list[ContentPart] |
finish_reason |
str |
logprobs |
list[TokenLogprob] | None |
model |
str | None |
prompt |
str | None |
text |
str |
timings |
Timings | None |
tokens |
list[int] |
tool_calls |
list[ToolCall] | None |
usage |
Usage | None |
Methods
| Method | Description |
|---|---|
append() |
Continue the conversation with a follow-up mess... |
endswith() |
Check if text ends with suffix. |
lower() |
Return text in lowercase. |
replace() |
Replace occurrences in text. |
split() |
Split text. |
startswith() |
Check if text starts with prefix. |
strip() |
Return text with leading/trailing chars removed. |
to_dict() |
Convert response to a JSON-serializable diction... |
upper() |
Return text in uppercase. |
Properties
chat: Chat | None
The Chat that generated this response.
content: list[ContentPart]
Structured content parts for multimodal output symmetry.
Returns a list of content parts, enabling symmetric handling of input and output. For text-only responses, this returns [OutputText(text=...)]. Future multimodal models will return additional part types (OutputImage, etc.).
This property is the source of truth for response content. The .text property is a convenience that concatenates all text parts.
Returns
List of content parts (currently OutputText for text responses).
Example
>>> response = chat("Hello!")
>>> for part in response.content:
... if part.type == ContentType.OUTPUT_TEXT:
... print(part.text)
Currently only returns OutputText. Future versions may include OutputImage, OutputAudio, etc. as models evolve.
finish_reason: str
Why generation stopped.
logprobs: list[TokenLogprob] | None
Token log probabilities (if requested).
model: str | None
Model identifier that generated this response.
prompt: str | None
The fully rendered prompt (available after iteration completes).
For async streaming responses, the prompt is captured after iteration finishes since messages are added during streaming. Access this property after consuming the stream.
Returns
The rendered prompt string, or None if iteration hasn't completed or the prompt couldn't be captured.
text: str
The generated text content.
For AsyncStreamingResponse, this returns the text accumulated so far. To get the complete text, ensure you have consumed the stream first by iterating with async for token in response.
Unlike sync StreamingResponse, AsyncStreamingResponse cannot auto-drain because it would require an async context. If you need the full text, iterate over the response first:
async for _ in response:
pass
full_text = response.text
Returns
The accumulated text content (partial if stream not exhausted).
timings: Timings | None
Generation timing breakdown.
tokens: list[int]
List of generated token IDs.
tool_calls: list[ToolCall] | None
Tool calls requested by the model (if any).
usage: Usage | None
Token usage statistics.
Methods
def append(
self,
message: str,
kwargs: Any
) → AsyncStreamingResponse
self,
message: str,
kwargs: Any
) → AsyncStreamingResponse
Continue the conversation with a follow-up message (async streaming).
Returns AsyncStreamingResponse (inherits streaming mode). Must be awaited.
See Response.append() for full documentation including auto-fork behavior.
Parameters
messageThe follow-up message text. **kwargs: Generation overrides (temperature, max_tokens, etc.).
Raises
StateErrorIf this response has no associated AsyncChat.
Example
>>> response = await chat("Hello") # stream=True by default
>>> async for token in response:
... print(token, end="")
>>> response2 = await response.append("Continue")
>>> async for token in response2:
... print(token, end="")
def endswith(self, suffix: str) → bool
Check if text ends with suffix.
def lower(self) → str
Return text in lowercase.
def replace(
self,
old: str,
new: str,
count: int = -1
) → str
self,
old: str,
new: str,
count: int = -1
) → str
Replace occurrences in text.
def split(
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
self,
sep: str | None = None,
maxsplit: int = -1
) → list[str]
Split text.
def startswith(self, prefix: str) → bool
Check if text starts with prefix.
def strip(self, chars: str | None = None) → str
Return text with leading/trailing chars removed.
def to_dict(self) → dict[str, Any]
Convert response to a JSON-serializable dictionary.
This solves the "serialization trap" where Response acts like a string but isn't directly JSON serializable. Use this for API responses, logging, or any context requiring JSON.
Returns
Dict with text, finish_reason, model, and usage (if available).
Example - FastAPI endpoint
>>> @app.post("/chat")
>>> async def chat_endpoint(message: str):
... response = await chat(message)
... return response.to_dict() # JSON serializable
Example - Logging
>>> import json
>>> response = chat("Hello!")
>>> json.dumps(response.to_dict()) # Works!
Example - Custom response structure
>>> result = {
... "success": True,
... "data": response.to_dict(),
... }
def upper(self) → str
Return text in uppercase.
class ConversationItems
ConversationItems(
self,
lib: Any,
conversation_ptr: int
)
self,
lib: Any,
conversation_ptr: int
)
Read-only view into conversation history.
Reads items directly from the Conversation C API, providing typed access without the Messages adapter layer. Items are read on-demand using zero-copy access to the underlying storage.
Example
>>> # Access items via chat.items
>>> for item in chat.items:
... if isinstance(item, MessageItem):
... print(f"{item.role.name}: {item.text}")
... elif isinstance(item, FunctionCallItem):
... print(f"Tool call: {item.name}({item.arguments})")
Quick Reference
Properties
| Name | Type |
|---|---|
first |
ConversationItem | None |
last |
ConversationItem | None |
system |
str | None |
Methods
| Method | Description |
|---|---|
count() |
S.count(value) -> integer -- return number of o... |
filter_by_role() |
Get all message items with a specific role. |
filter_by_type() |
Get all items of a specific type. |
index() |
S.index(value, [start, [stop]]) -> integer -- r... |
Properties
first: ConversationItem | None
Get the first item, or None if empty.
last: ConversationItem | None
Get the last item, or None if empty.
system: str | None
Get the system message content, or None if no system message.
Methods
def count(self, value)
S.count(value) -> integer -- return number of occurrences of value
def filter_by_role(self, role: MessageRole) → list[MessageItem]
Get all message items with a specific role.
Parameters
roleThe message role to filter by.
def filter_by_type(self, item_type: type[ConversationItem]) → list[ConversationItem]
Get all items of a specific type.
Parameters
item_typeThe ConversationItem subclass to filter by.
def index(
self,
value,
start = 0,
stop = None
)
self,
value,
start = 0,
stop = None
)
S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.
Supporting start and stop arguments is optional, but recommended.
class Hook
Base class for generation hooks.
Implement any subset of these methods to receive callbacks during generation. All methods have default no-op implementations, so you only need to override the ones you care about.
Methods are called in this order
1. on_generation_start - Before Zig generation begins 2. on_first_token - When first token arrives (streaming) or N/A (non-streaming) 3. on_generation_end - After generation completes (success or error)
Thread Safety
Hook methods may be called from different threads for concurrent generations. If your hook maintains state, ensure it's thread-safe.
Quick Reference
Methods
| Method | Description |
|---|---|
on_first_token() |
Handle first token event (streaming only). |
on_generation_end() |
Handle generation end event (success or error). |
on_generation_start() |
Handle generation start event. |
Methods
def on_first_token(
self,
chat: Chat | AsyncChat,
time_ms: float
) → None
self,
chat: Chat | AsyncChat,
time_ms: float
) → None
Handle first token event (streaming only).
This is the Time-To-First-Token (TTFT) measurement point, critical for perceived latency in interactive applications.
Parameters
chatThe Chat instance.
time_msMilliseconds since generation_start.
Only called for streaming responses. For non-streaming, TTFT is effectively the same as total latency.
Example
>>> def on_first_token(self, chat, time_ms):
... metrics.histogram("llm.ttft", time_ms)
def on_generation_end(
self,
chat: Chat | AsyncChat,
response: Response | None,
error: Exception | None = None
) → None
self,
chat: Chat | AsyncChat,
response: Response | None,
error: Exception | None = None
) → None
Handle generation end event (success or error).
Parameters
chatThe Chat instance.
responseThe Response object (if successful), or None if error.
errorThe exception (if generation failed), or None if successful.
Example
>>> def on_generation_end(self, chat, response, error=None):
... if error:
... metrics.counter("llm.errors", 1)
... else:
... metrics.counter("llm.tokens", response.usage.total_tokens)
def on_generation_start(
self,
chat: Chat | AsyncChat,
input_text: str,
config: Any = None
) → None
self,
chat: Chat | AsyncChat,
input_text: str,
config: Any = None
) → None
Handle generation start event.
Parameters
chatThe Chat instance initiating generation.
input_textThe user's input message.
configThe GenerationConfig for this request (if any).
Example
>>> def on_generation_start(self, chat, input_text, config=None):
... self.start_time = time.perf_counter()
... self.input_tokens = len(input_text.split()) # Rough estimate
class HookManager
HookManager(self, hooks: list[Hook] | None = None)
Hook dispatcher for generation lifecycle events.
Used internally by Client to dispatch hook calls. Users register hooks on the Client rather than interacting with this class directly.
Quick Reference
Properties
| Name | Type |
|---|---|
hooks |
list[Hook] |
Methods
| Method | Description |
|---|---|
add() |
Add a hook to the manager. |
dispatch_end() |
Dispatch on_generation_end to all hooks. |
dispatch_first_token() |
Dispatch on_first_token to all hooks. |
dispatch_start() |
Dispatch on_generation_start to all hooks. |
remove() |
Remove a hook from the manager. |
Properties
hooks: list[Hook]
Return the list of registered hooks.
Methods
def add(self, hook: Hook) → None
Add a hook to the manager.
Parameters
hookHook instance to register.
def dispatch_end(
self,
chat: Chat | AsyncChat,
response: Response | None,
error: Exception | None = None
) → None
self,
chat: Chat | AsyncChat,
response: Response | None,
error: Exception | None = None
) → None
Dispatch on_generation_end to all hooks.
Parameters
chatThe Chat or AsyncChat instance.
responseThe completed response, or None on error.
errorThe exception that occurred, if any.
def dispatch_first_token(
self,
chat: Chat | AsyncChat,
time_ms: float
) → None
self,
chat: Chat | AsyncChat,
time_ms: float
) → None
Dispatch on_first_token to all hooks.
Parameters
chatThe Chat or AsyncChat instance.
time_msTime to first token in milliseconds.
def dispatch_start(
self,
chat: Chat | AsyncChat,
input_text: str,
config: Any = None
) → None
self,
chat: Chat | AsyncChat,
input_text: str,
config: Any = None
) → None
Dispatch on_generation_start to all hooks.
Parameters
chatThe Chat or AsyncChat instance.
input_textThe user's input text.
configGeneration configuration, if any.
def remove(self, hook: Hook) → None
Remove a hook from the manager.
Parameters
hookHook instance to unregister.
class Token
Single token from a streaming response.
Token is returned during streaming iteration. It behaves exactly like a string for casual use (print, concatenation, etc.) but also carries per-token metadata when logprobs, token IDs, or stop reason detection are needed.
Attributes
idThe token ID from the tokenizer vocabulary.
logprobLog probability of this token (if logprobs were requested), or None.
is_specialTrue if this is a special token (EOS, BOS, etc.).
finish_reasonIf this is the last token, why generation stopped. Otherwise None. Possible values: "eos_token", "length", "stop_sequence", "tool_calls".
Example
>>> for token in chat("Hello", stream=True):
... print(token, end="", flush=True)
Example (with metadata)
>>> for token in chat("Hello", stream=True):
... if token.logprob is not None and token.logprob < -5.0:
... ui.highlight_uncertain(token)
... print(token, end="")
Token instances are immutable (like str). Metadata is set at construction and cannot be modified afterward.
class Usage
Usage(
self,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int
)
self,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int
)
Token usage statistics.
Attributes
prompt_tokensTokens in the input prompt.
completion_tokensTokens in the generated response.
total_tokensTotal tokens (prompt + completion).
class Timings
Timings(
self,
prefill_ms: float,
generation_ms: float,
tokens_per_second: float
)
self,
prefill_ms: float,
generation_ms: float,
tokens_per_second: float
)
Generation timing breakdown.
Provides detailed performance metrics for generation, useful for profiling, optimization, and monitoring latency in production.
Attributes
prefill_msTime to process the prompt (milliseconds). This is the "time to first token" - how long before generation starts.
generation_msTime to generate all tokens (milliseconds). This is the decode phase - actual token generation time.
tokens_per_secondGeneration throughput (tokens/sec). Calculated as completion_tokens / (generation_ms / 1000).
Example
>>> response = chat("Tell me a story")
>>> if response.timings:
... print(f"Prefill: {response.timings.prefill_ms:.1f}ms")
... print(f"Generation: {response.timings.generation_ms:.1f}ms")
... print(f"Speed: {response.timings.tokens_per_second:.1f} tok/s")
Quick Reference
Methods
| Method | Description |
|---|---|
from_ns() |
Create Timings from nanosecond values. |
Methods
def from_ns(
cls,
prefill_ns: int,
generation_ns: int,
token_count: int
) → Timings
cls,
prefill_ns: int,
generation_ns: int,
token_count: int
) → Timings
Create Timings from nanosecond values.
Parameters
prefill_nsPrefill time in nanoseconds.
generation_nsGeneration time in nanoseconds.
token_countNumber of tokens generated.
Returns
Timings instance with millisecond values and throughput.
class FinishReason
Constants for generation stop reasons.
Attributes
EOS_TOKENEnd-of-sequence token generated.
LENGTHMaximum token limit reached.
STOP_SEQUENCEUser-defined stop sequence matched.
TOOL_CALLSModel requested tool execution.
CANCELLEDRequest was cancelled (client disconnect, stop flag set).
class TokenLogprob
TokenLogprob(
self,
token: int,
token_str: str,
logprob: float,
top_logprobs: list[tuple[int, str, float]] | None = None
)
self,
token: int,
token_str: str,
logprob: float,
top_logprobs: list[tuple[int, str, float]] | None = None
)
Log probability for a single token.
Attributes
tokenToken ID.
token_strToken as string.
logprobLog probability.
top_logprobsAlternative tokens at this position.
class ResponseMetadata
ResponseMetadata(
self,
finish_reason: str,
schema_tokens: int = 0,
schema_injection: str | None = None,
grammar_gbnf: str | None = None,
grammar_trace: list[str] | None = None,
prefill_success: bool | None = None
)
self,
finish_reason: str,
schema_tokens: int = 0,
schema_injection: str | None = None,
grammar_gbnf: str | None = None,
grammar_trace: list[str] | None = None,
prefill_success: bool | None = None
)
Generation metadata and debug information.
class ResponseFormat
ResponseFormat(
self,
type: str = 'text',
json_schema: dict | None = None
)
self,
type: str = 'text',
json_schema: dict | None = None
)
Structured output format specification.
Used to constrain generation to produce valid JSON matching a schema.
Attributes
typeThe format type ("text" or "json_object").
json_schemaJSON Schema dict for structured output (when type="json_object").
Example
>>> config = GenerationConfig(
... response_format=ResponseFormat(
... type="json_object",
... json_schema={"type": "object", "properties": {"name": {"type": "string"}}}
... )
... )
class ToolCall
ToolCall(
self,
id: str,
type: str,
function: ToolCallFunction,
_func: Callable[..., Any] | None = None
)
self,
id: str,
type: str,
function: ToolCallFunction,
_func: Callable[..., Any] | None = None
)
Tool call requested by the model.
Follows the OpenAI tool call format for compatibility with agent frameworks and tool-calling workflows.
Attributes
idUnique identifier for this tool call.
typeAlways "function" for function calls.
functionThe function details (name and arguments).
Example
>>> if response.tool_calls:
... for tool in response.tool_calls:
... print(f"Call: {tool.function.name}")
... args = tool.function.arguments_parsed()
... result = execute_tool(tool.function.name, args)
Quick Reference
Properties
| Name | Type |
|---|---|
arguments |
str |
name |
str |
Methods
| Method | Description |
|---|---|
create() |
Create a ToolCall with the given parameters. |
execute() |
Execute the tool call by invoking the mapped Py... |
execute_async() |
Execute the tool call asynchronously. |
Properties
arguments: str
Convenience access to function arguments.
name: str
Convenience access to function name.
Methods
def create(
cls,
id: str,
name: str,
arguments: str
) → ToolCall
cls,
id: str,
name: str,
arguments: str
) → ToolCall
Create a ToolCall with the given parameters.
def execute(self) → Any
Execute the tool call by invoking the mapped Python function.
Returns
The return value of the tool function.
Raises
ToolExecutionErrorIf no function is mapped to this tool call.
def execute_async(self) → Any
Execute the tool call asynchronously.
Awaits coroutine functions directly. Runs sync functions in an executor to avoid blocking the event loop.
Returns
The return value of the tool function.
Raises
ToolExecutionErrorIf no function is mapped to this tool call.
class ToolCallFunction
ToolCallFunction(
self,
name: str,
arguments: str
)
self,
name: str,
arguments: str
)
Function name and arguments within a tool call.
Attributes
nameName of the function to call.
argumentsJSON string of arguments to pass.
Quick Reference
Methods
| Method | Description |
|---|---|
arguments_parsed() |
Parse arguments as dict |
Methods
def arguments_parsed(self) → dict
Parse arguments as dict. Returns empty dict on parse failure.
class ToolResult
ToolResult(
self,
tool_call_id: str,
content: str,
is_error: bool = False
)
self,
tool_call_id: str,
content: str,
is_error: bool = False
)
Result of a tool execution.
Added back to the conversation history so the model can incorporate the tool output in its next response.
Attributes
tool_call_idID of the tool call this is responding to.
contentThe tool's output/result.
is_errorWhether this result represents an error.
Example
>>> # Execute tool and add result
>>> result = execute_tool(tool.function.name, tool.function.arguments_parsed())
>>> # Tool results are added to the conversation automatically during generation
Quick Reference
Methods
| Method | Description |
|---|---|
to_message() |
Convert to OpenAI message format. |
Methods
def to_message(self) → dict
Convert to OpenAI message format.
class ToolState
ToolState(
self,
status: str,
input: dict | None = None,
title: str | None = None,
output: str | None = None,
error: str | None = None,
metadata: dict | None = None,
time_start: float | None = None,
time_end: float | None = None
)
self,
status: str,
input: dict | None = None,
title: str | None = None,
output: str | None = None,
error: str | None = None,
metadata: dict | None = None,
time_start: float | None = None,
time_end: float | None = None
)
Tool execution state for streaming UIs.
Provides state tracking for live UI updates during tool execution.
Attributes
statusCurrent status (pending, running, completed, error).
titleHuman-readable title for UI display.
inputParsed input arguments (dict, not JSON string).
outputTool result (when completed).
errorError message (when error).
metadataAdditional metadata for UI display.
time_startWhen execution started (Unix timestamp).
time_endWhen execution ended (Unix timestamp).
Example - Streaming updates
class ToolStatus
Constants for tool execution status.