Client
Entry point for LLM inference
Classes
| Class | Description |
|---|---|
| Client | Entry point for LLM inference. |
| AsyncClient | Async entry point for LLM inference. |
class Client
Client(
self,
model: ModelInput | list[ModelInput],
profile: Profile | None = None,
hooks: list[Hook] | None = None,
base_url: str | None = None,
api_key: str | None = None,
org_id: str | None = None,
timeout_ms: int | None = None,
max_retries: int | None = None,
gpu_layers: int | None = None,
use_mmap: bool | None = None,
num_threads: int | None = None
)
self,
model: ModelInput | list[ModelInput],
profile: Profile | None = None,
hooks: list[Hook] | None = None,
base_url: str | None = None,
api_key: str | None = None,
org_id: str | None = None,
timeout_ms: int | None = None,
max_retries: int | None = None,
gpu_layers: int | None = None,
use_mmap: bool | None = None,
num_threads: int | None = None
)
Entry point for LLM inference.
Concurrency
Not thread-safe. Create one Client per thread, or use separate Chat instances per thread (Chat instances are independent).
Manages model connections and creates conversations. Supports single models, multiple models for routing, and various backends (local, OpenAI, Anthropic, vLLM, etc.).
Multiple Client instances for the same model share the underlying engine. Subsequent Client creation after the first model load is inexpensive.
Parameters
model- Model identifier(s). Can be:
- str: Simple model ID ("Qwen/Qwen3-0.6B", "openai::gpt-4o")
- ModelSpec: Structured config for advanced backend settings
- list: Multiple models for routing/fan-out (coming soon)
profileOptional storage profile inherited by all chats created by this client. If provided, chats persist to
~/.talu/db/<profile>/by default.hooksList of Hook instances for observability (metrics, logging, tracing). Hooks receive callbacks at generation start, first token (TTFT), and end.
base_urlAPI endpoint URL. When provided, uses OpenAI-compatible backend.
api_keyAPI key for remote backends. Falls back to environment variables (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.).
org_idOrganization ID for remote backends.
timeout_msRequest timeout in milliseconds for remote backends.
max_retriesMaximum retry attempts for remote backends.
gpu_layersNumber of layers to offload to GPU (-1 for all). Local backend only.
use_mmapUse memory-mapped files for model loading. Local backend only.
num_threadsNumber of threads for inference (0 for auto). Local backend only.
Attributes
modelsList of available model identifiers.
default_modelThe default model used for generation.
hooksThe HookManager for adding/removing hooks after construction.
Example - Single model
Example - Remote backend (Pythonic)
Example - Local backend with GPU offload
>>> client = Client("Qwen/Qwen3-0.6B", gpu_layers=20, num_threads=4)
Example - Advanced config via ModelSpec (power users)
>>> from talu import Client
>>> from talu.router import ModelSpec, OpenAICompatibleBackend
>>>
>>> spec = ModelSpec(
... ref="my-model",
... backend=OpenAICompatibleBackend(
... base_url="http://localhost:8080/v1",
... timeout_ms=5000
... )
... )
>>> client = Client(spec)
Example - Multi-user serving
>>> client = Client("Qwen/Qwen3-0.6B")
>>> alice = client.chat(system="You are helpful.")
>>> bob = client.chat(system="You are a pirate.")
>>> response = alice("Hello!")
>>> response = bob("Ahoy!")
>>> client.close()
Raises ------ ValidationError: If no models are provided.
Example - Context manager
>>> with Client("Qwen/Qwen3-0.6B") as client:
... chat = client.chat()
... response = chat("Hello!")
Quick Reference
Properties
| Name | Type |
|---|---|
default_model |
str |
hooks |
HookManager |
models |
list[str] |
router |
Router |
Methods
| Method | Description |
|---|---|
ask() |
Stateless text completion (non-streaming). |
capabilities() |
Return backend capabilities for a model. |
chat() |
Create a Chat session for multi-turn conversati... |
close() |
Close client and release resources. |
embed() |
Extract embedding from text. |
embed_batch() |
Extract embeddings from multiple texts. |
embedding_dim() |
Get the embedding dimension for a model. |
raw_complete() |
Raw completion without chat templates. |
stream() |
Stream a stateless completion. |
Properties
default_model: str
The default model used when none is specified.
hooks: HookManager
The HookManager for observability.
Use this to add or remove hooks after Client construction:
>>> client = Client("model")
>>> client.hooks.add(MetricsHook())
>>> client.hooks.remove(my_hook)
Returns
HookManager instance.
models: list[str]
List of available model identifiers.
router: Router
The Router instance (for advanced use).
Methods
def ask(
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → Response
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → Response
Stateless text completion (non-streaming).
Generates a complete response for a single prompt without conversation history. This method always returns Response (not StreamingResponse) and waits for generation to finish before returning.
This is the efficient way to do one-shot generation when you have an existing Client instance (e.g., in production server or batch processing). No Chat object overhead - just calls chat.send() directly.
For multi-turn conversations with history and append(), use client.chat().
Streaming vs Non-Streaming
This method always returns Response (non-streaming). For streaming behavior with real-time token feedback, use:
client.chat()with defaultstream=True(Chat object, multi-turn)chat.send(prompt, stream=True)(one-shot streaming via Chat)
Use client.ask() when:
- You need the full response immediately (batch processing, API endpoints)
- You don't care about intermediate tokens
- You want Response object with metadata access
- You're doing multiple one-shot calls efficiently (reuses Client)
Use client.chat() with stream=True when:
- You want real-time feedback (interactive applications)
- You're building a chat interface with live updates
- Long generations where progress matters
- Reducing perceived latency for user-facing apps
Why client.ask() is non-streaming:
One-shot completions are typically used for batch processing or API endpoints where you need the complete result immediately. Adding streaming adds complexity (iterators, caching) without benefit for these use cases.
Parameters
promptThe input prompt.
modelModel to use (overrides default_model).
configGeneration configuration. **kwargs: Generation overrides.
Returns
Response object (str-able, with metadata access).
Raises
StateErrorIf client has been closed.
GenerationErrorIf generation fails.
Example
>>> from talu import Client
>>> client = Client("Qwen/Qwen3-0.6B")
>>> # One-shot completions (efficient, reuses Client)
>>> response = client.ask("What is 2+2?")
>>> print(response)
4
>>> # Multiple one-shot calls (no history, fast)
>>> for task in ["What is 3+3?", "What is 4+4?"]:
... print(client.ask(task))
>>> # For streaming with real-time feedback, use chat()
>>> chat = client.chat()
>>> for token in chat("Tell me a story"):
... print(token, end="", flush=True)
def capabilities(self, model: str | ModelSpec | None = None) → Capabilities
Return backend capabilities for a model.
Parameters
modelModel to query, or None for default.
Returns
Capabilities object with backend feature flags.
Raises
StateErrorIf the client has been closed.
ValidationErrorIf model_input format is invalid.
TaluErrorIf capability retrieval fails.
def chat(
self,
system: str | None = None,
messages: list[dict] | None = None,
model: str | None = None,
config: GenerationConfig | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
marker: str = '',
metadata: dict | None = None,
chat_template: str | PromptTemplate | None = None,
offline: bool = False
) → Chat
self,
system: str | None = None,
messages: list[dict] | None = None,
model: str | None = None,
config: GenerationConfig | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
marker: str = '',
metadata: dict | None = None,
chat_template: str | PromptTemplate | None = None,
offline: bool = False
) → Chat
Create a Chat session for multi-turn conversations.
Returns a Chat object that stores conversation history and allows generation with append() support.
Streaming Behavior
Chat instances default to streaming (stream=True) when called via chat(prompt) or chat()(prompt). This returns StreamingResponse with tokens arriving incrementally, providing immediate feedback and matching industry standard for chat interfaces.
To disable streaming and get complete Response after generation finishes, pass stream=False:
>>> chat = client.chat()
>>> response = chat("Hello!", stream=False) # Response (non-streaming)
>>> print(response.text) # Immediately available
For streaming (default):
>>> for token in chat("Hello!"): # StreamingResponse
... print(token, end="", flush=True)
Parameters
systemSystem prompt.
messagesInitial message history (for restoring sessions).
modelDefault model for this chat (not yet supported).
configDefault generation config for this chat. Profile persistence (if configured on Client) is also applied.
session_idOptional session identifier for this chat.
parent_session_idOptional parent session identifier for forks.
markerSession marker for storage backends (default: "" = normal/unmarked). Values: "pinned", "archived", "deleted", or "" (normal).
metadataOptional session metadata dict.
chat_templateCustom chat template (None uses model default, use explicit None for raw completion without template formatting).
offlineIf True, disallow network access when resolving model URIs.
Returns
Chat object for multi-turn interaction.
Raises
StateErrorIf the client has been closed.
NotImplementedErrorIf messages parameter is provided (not yet supported).
Example
>>> chat = client.chat(system="You are a pirate.")
>>> response = chat("Hello!")
>>> response = response.append("Tell me about your ship")
>>> print(chat.items) # Full history
def close(self) → None
Close client and release resources.
Closes all model connections. After calling close(), the client cannot be used for generation. Safe to call multiple times.
def embed(
self,
text: str,
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[float]
self,
text: str,
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[float]
Extract embedding from text.
Runs the full transformer forward pass and returns pooled hidden states as a dense vector embedding. Useful for semantic search, RAG, and document similarity.
Parameters
textInput text to embed.
modelModel to use (overrides default_model).
pooling- Pooling strategy:
- "last" (default): Last token's hidden state (best for decoder models)
- "mean": Average of all token hidden states
- "first": First token (CLS token for BERT-style models)
normalizeWhether to L2-normalize the output embedding. Default True.
Returns
List of floats representing the embedding vector. Length equals the model's hidden dimension (d_model).
Raises
StateErrorIf the client has been closed.
ValidationErrorIf pooling strategy is invalid.
GenerationErrorIf embedding extraction fails.
Example
>>> client = Client("Qwen/Qwen3-0.6B")
>>> embedding = client.embed("Hello, world!")
>>> len(embedding) # d_model (e.g., 1024)
Example - Semantic similarity
>>> emb1 = client.embed("The cat sat on the mat")
>>> emb2 = client.embed("A feline rested on the rug")
>>> similarity = sum(a*b for a, b in zip(emb1, emb2)) # cosine sim (if normalized)
def embed_batch(
self,
texts: list[str],
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[list[float]]
self,
texts: list[str],
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[list[float]]
Extract embeddings from multiple texts.
Parameters
textsList of input texts to embed.
modelModel to use (overrides default_model).
poolingPooling strategy ("last", "mean", "first").
normalizeWhether to L2-normalize the output embeddings.
Returns
List of embedding vectors, one per input text.
Raises
StateErrorIf the client has been closed.
Example
>>> embeddings = client.embed_batch([
... "First document",
... "Second document",
... ])
def embedding_dim(self, model: str | None = None) → int
Get the embedding dimension for a model.
Parameters
modelModel to query, or None for default.
Returns
The embedding dimension (d_model).
Raises
StateErrorIf the client has been closed.
def raw_complete(
self,
prompt: str,
system: str | None = None,
model: str | None = None,
config: GenerationConfig | None = None,
completion_opts: CompletionOptions | None = None,
kwargs: Any
) → Response
self,
prompt: str,
system: str | None = None,
model: str | None = None,
config: GenerationConfig | None = None,
completion_opts: CompletionOptions | None = None,
kwargs: Any
) → Response
Raw completion without chat templates.
Sends prompt directly to model without formatting. This is a technical use case for prompt engineering and advanced control. Most users should use client.chat().send() with chat templates instead.
The ONLY difference from chat-based completion
- Chat uses the model's chat template (adds role markers)
raw_complete()does NOT apply any template (sends raw prompt)
Raw-only options (available ONLY via completion_opts parameter):
token_ids: Send pre-tokenized input, bypassing tokenizer.continue_from_token_id: Force continuation from a specific token ID.echo_prompt: Return input + output combined.
These options don't make sense with chat-formatted prompts and are intentionally EXCLUDED from chat-based APIs to keep them clean.
Parameters
promptThe raw prompt (sent exactly as-is, no formatting).
systemOptional system prompt.
modelModel to use (overrides default_model).
configGeneration configuration.
completion_optsRaw-completion options (CompletionOptions). **kwargs: Additional generation overrides (for any backend options not yet in CompletionOptions).
Returns
Response object.
Raises
StateErrorIf client has been closed.
GenerationErrorIf generation fails.
Example
>>> client = Client("Qwen/Qwen3-0.6B")
>>> # Raw completion
>>> response = client.raw_complete("Continue: The sky is")
>>> print(response)
blue due to Rayleigh scattering.
>>> # With CompletionOptions
>>> from talu.router import CompletionOptions
>>> opts = CompletionOptions(
... token_ids=[1234, 5678],
... continue_from_token_id=151645
... )
>>> response = client.raw_complete("Continue: ", completion_opts=opts)
def stream(
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → Iterator[str]
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → Iterator[str]
Stream a stateless completion.
Parameters
promptThe input prompt.
modelModel to use (not yet supported).
configGeneration configuration. **kwargs: Generation overrides.
Yields
Text chunks as they are generated.
Raises
StateErrorIf the client has been closed.
GenerationErrorIf generation fails.
Example
>>> for chunk in client.stream("Tell me a story"):
... print(chunk, end="", flush=True)
class AsyncClient
AsyncClient(
self,
model: ModelInput | list[ModelInput],
hooks: list[Hook] | None = None,
base_url: str | None = None,
api_key: str | None = None,
org_id: str | None = None,
timeout_ms: int | None = None,
max_retries: int | None = None,
gpu_layers: int | None = None,
use_mmap: bool | None = None,
num_threads: int | None = None
)
self,
model: ModelInput | list[ModelInput],
hooks: list[Hook] | None = None,
base_url: str | None = None,
api_key: str | None = None,
org_id: str | None = None,
timeout_ms: int | None = None,
max_retries: int | None = None,
gpu_layers: int | None = None,
use_mmap: bool | None = None,
num_threads: int | None = None
)
Async entry point for LLM inference.
Async equivalent of Client for non-blocking inference (FastAPI, aiohttp, etc.). All generation methods are async and must be awaited.
Wraps the same engine as Client. Model weights are cached globally, so creating AsyncClient for the same model as an existing Client shares the underlying engine.
Concurrency
Safe to share across asyncio tasks. Not thread-safe across OS threads.
Parameters
model- Model identifier(s). Can be:
- str: Simple model ID ("Qwen/Qwen3-0.6B", "openai::gpt-4o")
- ModelSpec: Structured config for advanced backend settings
- list: Multiple models for routing/fan-out (coming soon)
base_urlAPI endpoint URL. When provided, uses OpenAI-compatible backend.
api_keyAPI key for remote backends. Falls back to environment variables (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.).
org_idOrganization ID for remote backends.
timeout_msRequest timeout in milliseconds for remote backends.
max_retriesMaximum retry attempts for remote backends.
gpu_layersNumber of layers to offload to GPU (-1 for all). Local backend only.
use_mmapUse memory-mapped files for model loading. Local backend only.
num_threadsNumber of threads for inference (0 for auto). Local backend only.
Example - Basic async usage
>>> async with AsyncClient("Qwen/Qwen3-0.6B") as client:
... response = await client.ask("What is 2+2?")
... print(response)
Example - Remote backend (Pythonic)
>>> async with AsyncClient("gpt-4", base_url="http://localhost:8080/v1") as client:
... response = await client.ask("Hello!")
Example - Advanced config via ModelSpec (power users)
>>> from talu import AsyncClient
>>> from talu.router import ModelSpec, OpenAICompatibleBackend
>>>
>>> spec = ModelSpec(
... ref="my-model",
... backend=OpenAICompatibleBackend(
... base_url="http://localhost:8080/v1",
... timeout_ms=5000
... )
... )
>>> async with AsyncClient(spec) as client:
... response = await client.ask("Hello!")
Example - Async streaming
>>> async with AsyncClient("Qwen/Qwen3-0.6B") as client:
... async for chunk in client.stream("Tell me a story"):
... print(chunk, end="", flush=True)
Raises ------ ValidationError: If no models are provided.
Example - Multi-user async serving
>>> client = AsyncClient("Qwen/Qwen3-0.6B")
>>> alice = client.chat(system="You are helpful.")
>>> bob = client.chat(system="You are a pirate.")
>>> response = await alice("Hello!")
>>> response = await bob("Ahoy!")
>>> await client.close()
Quick Reference
Properties
| Name | Type |
|---|---|
default_model |
str |
hooks |
HookManager |
models |
list[str] |
router |
Router |
Methods
| Method | Description |
|---|---|
ask() |
Async stateless text completion. |
capabilities() |
Return backend capabilities for a model. |
chat() |
Create a new AsyncChat instance. |
close() |
Close client and release resources. |
embed() |
Extract embedding from text (synchronous). |
embed_batch() |
Extract embeddings from multiple texts (synchro... |
embedding_dim() |
Get the embedding dimension for a model. |
stream() |
Async stream a stateless completion. |
Properties
default_model: str
The default model used when none is specified.
hooks: HookManager
The HookManager for observability.
Use this to add or remove hooks after AsyncClient construction.
Returns
HookManager instance.
models: list[str]
List of available model identifiers.
router: Router
The Router instance (for advanced use).
Methods
def ask(
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → AsyncResponse
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → AsyncResponse
Async stateless text completion.
Generates a response for a single prompt without conversation history. For multi-turn conversations, use chat() instead.
Parameters
promptThe input prompt.
modelModel to use (overrides default_model).
configGeneration configuration. **kwargs: Generation overrides.
Returns
Response object (str-able, with metadata access).
Raises
StateErrorIf the client has been closed.
GenerationErrorIf generation fails.
Example
>>> response = await client.ask("What is 2+2?")
>>> print(response)
def capabilities(self, model: str | ModelSpec | None = None) → Capabilities
Return backend capabilities for a model.
Parameters
modelModel to query, or None for default.
Returns
Capabilities object with backend feature flags.
Raises
StateErrorIf the client has been closed.
ValidationErrorIf model_input format is invalid.
TaluErrorIf capability retrieval fails.
def chat(
self,
system: str | None = None,
messages: list[dict] | None = None,
model: str | None = None,
config: GenerationConfig | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
marker: str = '',
metadata: dict | None = None,
chat_template: str | PromptTemplate | None = None,
offline: bool = False
) → AsyncChat
self,
system: str | None = None,
messages: list[dict] | None = None,
model: str | None = None,
config: GenerationConfig | None = None,
session_id: str | None = None,
parent_session_id: str | None = None,
marker: str = '',
metadata: dict | None = None,
chat_template: str | PromptTemplate | None = None,
offline: bool = False
) → AsyncChat
Create a new AsyncChat instance.
Note: This method itself is synchronous - it just creates the AsyncChat object. The generation methods on AsyncChat are async.
Parameters
systemSystem prompt.
messagesInitial message history (for restoring sessions).
modelDefault model for this chat (not yet supported).
configDefault generation config for this chat.
session_idOptional session identifier for this chat.
parent_session_idOptional parent session identifier for forks.
markerSession marker for storage backends (default: "" = normal/unmarked). Values: "pinned", "archived", "deleted", or "" (normal).
metadataOptional session metadata dict.
chat_templateCustom chat template (None uses model default, use explicit None for raw completion without template formatting).
offlineIf True, disallow network access when resolving model URIs.
Returns
AsyncChat object for async multi-turn interaction.
Raises
StateErrorIf the client has been closed.
NotImplementedErrorIf messages parameter is provided (not yet supported).
Example
>>> chat = client.chat(system="You are helpful.")
>>> response = await chat("Hello!")
>>> response = await response.append("Tell me more")
def close(self) → None
Close client and release resources.
Closes all model connections. After calling close(), the client cannot be used for generation. Safe to call multiple times.
def embed(
self,
text: str,
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[float]
self,
text: str,
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[float]
Extract embedding from text (synchronous).
Note: This method is synchronous as embedding extraction is typically fast enough to not require async.
Parameters
textInput text to embed.
modelModel to use (overrides default_model).
poolingPooling strategy ("last", "mean", "first").
normalizeWhether to L2-normalize the output embedding.
Returns
List of floats representing the embedding vector.
Raises
StateErrorIf the client has been closed.
ValidationErrorIf pooling strategy is invalid.
GenerationErrorIf embedding extraction fails.
def embed_batch(
self,
texts: list[str],
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[list[float]]
self,
texts: list[str],
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[list[float]]
Extract embeddings from multiple texts (synchronous).
Parameters
textsList of input texts to embed.
modelModel to use (overrides default_model).
poolingPooling strategy.
normalizeWhether to L2-normalize.
Returns
List of embedding vectors.
Raises
StateErrorIf the client has been closed.
def embedding_dim(self, model: str | None = None) → int
Get the embedding dimension for a model.
Parameters
modelModel to query, or None for default.
Returns
The embedding dimension (d_model).
Raises
StateErrorIf the client has been closed.
def stream(
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → AsyncIterator[str]
self,
prompt: str,
model: str | None = None,
config: GenerationConfig | None = None,
kwargs: Any
) → AsyncIterator[str]
Async stream a stateless completion.
Parameters
promptThe input prompt.
modelModel to use (not yet supported).
configGeneration configuration. **kwargs: Generation overrides.
Yields
Text chunks as they are generated.
Raises
StateErrorIf the client has been closed.
GenerationErrorIf generation fails.
Example
>>> async for chunk in client.stream("Tell me a story"):
... print(chunk, end="", flush=True)
ModelInput
ModelInput = str | ModelSpec