Router
Router module - Model routing, backend specification, and generation config
Classes
| Class | Description |
|---|---|
| Router | Routes generation requests to models. |
| ModelTarget | Model target with optional custom endpoint. |
| StreamToken | Streamed token with content classification metadata. |
| GenerationConfig | Configuration for text generation. |
| Grammar | Pre-compiled grammar for structured output. |
| CompletionOptions | Raw-completion-specific options. |
| StopFlag | Thread-safe stop flag for cancelling generation. |
| BackendSpec | Protocol for backend configuration specifications. |
| LocalBackend | Configuration for local inference backend. |
| OpenAICompatibleBackend | Configuration for OpenAI-compatible API backend. |
| ModelSpec | Specification for a model with optional backend configura... |
| Capabilities | Capability flags for a model backend. |
| RemoteModelInfo | Information about a model from a remote endpoint. |
Functions
| Function | Description |
|---|---|
| list_endpoint_models() | List available models from an OpenAI-compatible endpoint. |
| check_endpoint() | Check if an OpenAI-compatible endpoint is available. |
| get_model_ids() | Get just the model IDs from a remote endpoint. |
class Router
Router(
self,
models: list[str] | list[ModelTarget] | list[ModelSpec] | list[str | ModelSpec],
default_model: str | None = None
)
self,
models: list[str] | list[ModelTarget] | list[ModelSpec] | list[str | ModelSpec],
default_model: str | None = None
)
Routes generation requests to models.
Router holds model targets (names + optional endpoints) and submits generation requests. This is typically created by Client, not by users directly.
Concurrency
Not thread-safe. Create one Router per thread, or use Client which manages Router instances internally.
Example
>>> # Usually created via Client
>>> client = Client("Qwen/Qwen3-0.6B")
>>> # client._router is the Router instance
Quick Reference
Properties
| Name | Type |
|---|---|
default_model |
str |
models |
list[str] |
Methods
| Method | Description |
|---|---|
close() |
Close router and release resources. |
embed() |
Extract embeddings from text. |
embedding_dim() |
Get the embedding dimension for a model. |
generate() |
Generate a response for a Chat. |
get_endpoint() |
Get custom endpoint for a model, if any. |
set_endpoint() |
Set custom endpoint for a model. |
stream() |
Stream a response for a Chat. |
stream_async() |
Async stream a response for a Chat. |
submit() |
Submit a prepared message list for generation. |
Properties
default_model: str
The default model used when none is specified.
models: list[str]
List of available model names.
Methods
def close(self) → None
Close router and release resources.
Waits for any active generation threads to complete before releasing resources to prevent use-after-free crashes.
Note: This only frees this Router's backend handles. The global engine cache in Zig is intentionally NOT cleared, as other Chat/Router instances may still be using those engines. The engine cache is process-level and shared across all instances for performance.
def embed(
self,
text: str,
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[float]
self,
text: str,
model: str | None = None,
pooling: str = 'last',
normalize: bool = True
) → list[float]
Extract embeddings from text.
Runs the full transformer forward pass and returns pooled hidden states as a dense vector embedding. Uses the same cached engine as generation.
Parameters
textInput text to embed.
modelModel to use, or None for default.
pooling- Pooling strategy:
- "last" (default): Last token's hidden state (best for decoder models)
- "mean": Average of all token hidden states
- "first": First token (CLS token for BERT-style models)
normalizeWhether to L2-normalize the output embedding. Default True.
Returns
List of floats representing the embedding vector. Length equals the model's hidden dimension (d_model).
Raises
GenerationErrorIf embedding extraction fails.
ValidationErrorIf invalid pooling strategy.
Example
>>> router = Router(["Qwen/Qwen3-0.6B"])
>>> embedding = router.embed("Hello, world!")
>>> len(embedding) # d_model (e.g., 1024)
def embedding_dim(self, model: str | None = None) → int
Get the embedding dimension for a model.
Parameters
modelModel to query, or None for default.
Returns
The embedding dimension (d_model).
Raises
GenerationErrorIf the model cannot be loaded or doesn't support embeddings.
def generate(
self,
chat: Chat | AsyncChat,
user_message: str | list[dict],
config: GenerationConfig | None = None,
model: str | None = None,
stop_flag: StopFlag | None = None
) → dict[str, Any]
self,
chat: Chat | AsyncChat,
user_message: str | list[dict],
config: GenerationConfig | None = None,
model: str | None = None,
stop_flag: StopFlag | None = None
) → dict[str, Any]
Generate a response for a Chat.
- Submits request to Zig C API. Zig handles:
- Adding user_message to Messages
- Routing to correct engine
- Running inference
- Adding assistant response to Messages
Parameters
chatThe Chat instance with conversation history.
user_message- The user's message. Can be:
- A string for simple text messages
- A list of content parts for multimodal input (Open Responses format): [{"type": "input_text", "text": "..."}, {"type": "input_image", "image_url": "data:image/png;base64,..."}] Or use InputImage/InputAudio/InputVideo classes and normalize_content().
configGeneration configuration.
modelModel to use, or None for default.
stop_flagOptional StopFlag for cancellation. When signalled, Zig stops generation gracefully on its next decode loop iteration.
Returns
dict with 'text', 'token_count', 'prompt_tokens', 'completion_tokens', 'prefill_ns', 'generation_ns'.
Raises
GenerationErrorIf generation fails. Error message includes detailed context from Zig (model path, specific failure reason, etc.).
ValidationErrorIf the specified model is not found in targets.
StateErrorIf the router has been closed.
def get_endpoint(self, model: str | None = None) → str | None
Get custom endpoint for a model, if any.
Parameters
modelModel name, or None for the default model.
def set_endpoint(
self,
model: str,
endpoint: str | None
) → None
self,
model: str,
endpoint: str | None
) → None
Set custom endpoint for a model.
Parameters
modelModel identifier.
endpointCustom endpoint URL, or None to use default.
Raises
ValidationErrorIf model is not available.
def stream(
self,
chat: Chat,
user_message: str | list[dict],
config: GenerationConfig | None = None,
model: str | None = None,
stop_flag: StopFlag | None = None
) → Iterator[StreamToken]
self,
chat: Chat,
user_message: str | list[dict],
config: GenerationConfig | None = None,
model: str | None = None,
stop_flag: StopFlag | None = None
) → Iterator[StreamToken]
Stream a response for a Chat.
Submits request to Zig C API. Zig handles all message management. Tokens are yielded in real-time as they are generated.
Uses a pull-based iterator API internally for reliable streaming without callback lifetime issues.
Each yielded StreamToken carries text, item_type, and content_type metadata for content classification.
Parameters
chatThe Chat instance with conversation history.
user_message- The user's message. Can be:
- A string for simple text messages
- A list of content parts for multimodal input (Open Responses format): [{"type": "input_text", "text": "..."}, {"type": "input_image", "image_url": "data:image/png;base64,..."}] Or use InputImage/InputAudio/InputVideo classes and normalize_content().
configGeneration configuration.
modelModel to use, or None for default.
stop_flagOptional flag to cancel generation mid-stream.
Yields
StreamToken instances with text, item_type, content_type.
Raises
GenerationErrorIf streaming fails. Error message includes detailed context from Zig (model path, specific failure reason, etc.).
ValidationErrorIf the specified model is not found in targets.
StateErrorIf the router has been closed.
def stream_async(
self,
chat: Chat | AsyncChat,
user_message: str | list[dict],
config: GenerationConfig | None = None,
model: str | None = None
) → AsyncIterator[StreamToken]
self,
chat: Chat | AsyncChat,
user_message: str | list[dict],
config: GenerationConfig | None = None,
model: str | None = None
) → AsyncIterator[StreamToken]
Async stream a response for a Chat.
True async streaming using a background thread with cancellation support. Tokens are yielded as they are generated, not buffered.
When the async iterator is cancelled (e.g., client disconnect, CancelledError), generation is stopped gracefully via the stop flag mechanism.
Parameters
chatThe Chat instance with conversation history.
user_messageThe user's message to add and respond to.
configGeneration configuration.
modelModel to use, or None for default.
Yields
StreamToken instances with text, item_type, content_type.
Raises
StateErrorIf router has no default model and none specified.
GenerationErrorIf generation fails (Zig error, model error, etc.).
def submit(
self,
messages: list[dict],
config: GenerationConfig | None = None,
response_format: type | dict | None = None,
chat: Chat | None = None,
model: str | None = None,
stop_flag: StopFlag | None = None,
_kwargs: Any
) → dict[str, Any]
self,
messages: list[dict],
config: GenerationConfig | None = None,
response_format: type | dict | None = None,
chat: Chat | None = None,
model: str | None = None,
stop_flag: StopFlag | None = None,
_kwargs: Any
) → dict[str, Any]
Submit a prepared message list for generation.
Parameters
messagesList of message dicts with 'role' and 'content' keys.
configGeneration configuration.
response_formatDataclass type or JSON schema dict for structured output.
chatThe Chat instance (required for native backend).
modelModel to use, or None for default.
stop_flagOptional StopFlag for cancellation.
Returns
dict with 'text', 'token_count', 'prompt_tokens', 'completion_tokens', 'prefill_ns', 'generation_ns'.
Raises
ValidationErrorIf chat is None or no user message found in messages.
GenerationErrorIf generation fails.
StateErrorIf the router has been closed.
class ModelTarget
ModelTarget(
self,
model: str,
endpoint: str | None = None,
spec: ModelSpec | None = None
)
self,
model: str,
endpoint: str | None = None,
spec: ModelSpec | None = None
)
Model target with optional custom endpoint.
Attributes
modelModel identifier (e.g., "Qwen/Qwen3-0.6B", "openai::gpt-4o").
endpointOptional custom endpoint URL (overrides provider defaults).
specThe ModelSpec used to create this target.
class StreamToken
StreamToken(
self,
text: str,
item_type: int,
content_type: int
)
self,
text: str,
item_type: int,
content_type: int
)
Streamed token with content classification metadata.
Each token carries text, item_type, and content_type discriminators from the responses type system, enabling correct SSE event routing and display.
StreamToken is an internal wire format between Router (producer) and Chat's streaming response (consumer). It is never persisted, never user-facing (users see Token), and is constructed only inside Router.stream()/Router.astream(). talu.types holds domain concepts (Items, Records, Events); StreamToken is a transient protocol detail that belongs with the code that constructs it.
Attributes
textDecoded token text.
item_typecontent_typeContent type (e.g.
ContentType.OUTPUT_TEXT,ContentType.REASONING_TEXT).
class GenerationConfig
GenerationConfig(
self,
max_tokens: int = 256,
temperature: float = 0.7,
top_k: int = 50,
top_p: float = 0.9,
min_p: float = 0.0,
repetition_penalty: float = 1.0,
stop_sequences: list[str] | None = None,
stop_token_ids: list[int] | None = None,
seed: int | None = None,
response_format: ResponseFormat | None = None,
logprobs: bool = False,
top_logprobs: int | None = None,
logit_bias: dict[int, float] | None = None,
chat_template: PromptTemplate | str | None = None,
extra_context: dict | None = None,
tools_json: str | None = None,
tool_choice: str | None = None,
schema_strategy: SchemaStrategy = 'auto',
inject_schema_prompt: bool = True,
allow_thinking: bool = False,
max_thinking_tokens: int = 512,
validation_retries: int = 0,
extra_body: dict | None = None
)
self,
max_tokens: int = 256,
temperature: float = 0.7,
top_k: int = 50,
top_p: float = 0.9,
min_p: float = 0.0,
repetition_penalty: float = 1.0,
stop_sequences: list[str] | None = None,
stop_token_ids: list[int] | None = None,
seed: int | None = None,
response_format: ResponseFormat | None = None,
logprobs: bool = False,
top_logprobs: int | None = None,
logit_bias: dict[int, float] | None = None,
chat_template: PromptTemplate | str | None = None,
extra_context: dict | None = None,
tools_json: str | None = None,
tool_choice: str | None = None,
schema_strategy: SchemaStrategy = 'auto',
inject_schema_prompt: bool = True,
allow_thinking: bool = False,
max_thinking_tokens: int = 512,
validation_retries: int = 0,
extra_body: dict | None = None
)
Configuration for text generation.
This dataclass groups all sampling and generation parameters into a single object, making it easy to reuse configurations across multiple calls and reducing argument bloat in method signatures.
Mutability
GenerationConfig is mutable, allowing in-place modification of session defaults:
>>> chat = Chat("model", config=GenerationConfig(temperature=0.7))
>>> chat.config.temperature = 1.2 # Modify for subsequent calls
>>> chat.config.max_tokens = 500
For creating variations without modifying the original, use .override():
>>> base = GenerationConfig(temperature=0.7)
>>> creative = base.override(temperature=1.2) # New config, base unchanged
Design Note
GenerationConfig serves as "Request Configuration" - not just sampling parameters, but all per-request settings including prompting concerns (chat_template, extra_context, schema_strategy, stop_sequences).
This design enables
1. Session-level defaults via Chat(config=GenerationConfig(...)) 2. In-place modification via chat.config.temperature = 0.5 3. Per-request overrides via chat.send(..., schema_strategy="typescript") 4. A/B testing by passing different configs to the same Chat
Parameters like schema_strategy live here (not on Chat) because:
Attributes
max_tokensMaximum number of tokens to generate. Default is 256. One token is roughly 4 characters or 0.75 words in English.
temperature- Controls randomness in generation. Default is 0.7.
- 0.0: Deterministic (greedy decoding)
- 0.1-0.5: Focused and consistent
- 0.7-1.0: Balanced creativity
- 1.0-2.0: More creative and varied
top_kLimits token selection to the k most likely. Default is 50. Set to 0 to disable top-k filtering.
top_pNucleus sampling threshold. Default is 0.9. Selects from smallest set of tokens whose cumulative probability exceeds this threshold.
min_pMinimum probability threshold. Default is 0.0 (disabled). Tokens with probability below min_p * max_prob are excluded. Modern alternative to top_p for some use cases.
repetition_penaltyPenalty for repeating tokens. Default is 1.0. Values > 1.0 discourage repetition, < 1.0 encourage it.
stop_sequencesList of strings that stop generation. Default is None. When any of these strings is generated, generation stops immediately. The stop sequence itself is NOT included in the output.
Multi-token sequences are fully supported. Each stop sequence is tokenized and the full token sequence is matched during generation. For example, "User:" will only trigger when the complete sequence is generated, not when just "User" appears.
stop_token_idsList of token IDs that stop generation. Default is None. When any of these token IDs is generated, generation stops immediately.
This overrides the model's default EOS tokens for this request only. If you want to ADD to the default EOS tokens (not replace them), include the model's eos_token_ids in your list.
Use cases
- Override model's EOS tokens for specific prompts
- Add additional stop tokens (like newline) for line-by-line generation
- Implement custom stop logic without string matching overhead
Example: stop_token_ids=[151645, 198] # EOS + newline for Qwen
seed: Random seed for reproducibility. Default is None (random). Set to a fixed value for deterministic outputs.
response_format: Structured output format specification. Default is None. Use ResponseFormat to constrain output to JSON or regex patterns. Note: Not yet implemented in the runtime.
logprobs: Whether to return token log probabilities. Default is False. When True, GenerationResult.logprobs will contain log probabilities for each generated token.
top_logprobs: Number of top token alternatives to return. Default is None. When set (1-20), returns log probabilities for the top N most likely tokens at each position. Requires logprobs=True.
logit_bias: Dictionary mapping token IDs to bias values. Default is None. Positive values increase the likelihood of a token being sampled, negative values decrease it. Use -100 or lower to effectively ban a token from appearing.
Example: {1234: -100} # Ban token 1234
Example: {5678: 5.0} # Strongly prefer token 5678
chat_template: Custom chat template for this request. Default is None. When set, uses this template instead of the session's chat_template or the model's built-in template. Accepts either a template string or a PromptTemplate object.
This follows standard configuration precedence
- GenerationConfig.chat_template (per-request) takes priority over
- Chat.chat_template (session-level) which takes priority over
- Model's default template (from tokenizer_config.json)
The template uses Jinja2 syntax with standard variables:
- messages: List of message dicts with 'role' and 'content'
- bos_token, eos_token: Special tokens from model config
- add_generation_prompt: Whether to append assistant prompt
Example: "{% for m in messages %}{{ m.role }}: {{ m.content }}\n{% endfor %}"
extra_context: Additional template variables. Default is None. Dictionary of extra variables to inject into the template context. These become available alongside standard variables (messages, etc.).
Use cases
- Tool definitions: {"tools": [{"name": "search", ...}]}
- System metadata: {"date": "2024-01-15", "user_name": "Alice"}
- Custom flags: {"enable_thinking": True}
Example
>>> config = GenerationConfig(
... extra_context={"tools": [{"name": "calculator"}]}
... )
tools_json: Tool definitions as a JSON array string. Default is None. This is set internally by Chat when tool calling is enabled.
tool_choice: Tool choice directive ("auto", "required", "none", or function name). Default is None and set internally by Chat when tool calling is enabled.
- schema_strategy: Strategy for injecting schema into prompts. Default is "auto". Controls how JSON schemas are formatted when using response_format.
- "auto": Automatically select based on model architecture (recommended)
- "typescript": TypeScript interface syntax (best for code-trained models)
- "json_schema": Raw JSON Schema (for older/simpler models)
- "xml_schema": XML-wrapped schema (for Anthropic-style models)
Most users should leave this as "auto". Override only when experimenting
with new models or when you know a specific format works better.
inject_schema_prompt: Whether to inject schema into the prompt. Default is True. When True and response_format is provided, the schema is automatically injected into the system prompt or user message.
Set to False if you've manually included the schema in your prompt
and want grammar enforcement without auto-injection.
allow_thinking: Enable chain-of-thought reasoning mode. Default is False. When True, allows the model to output a <think>...</think> block before the structured response. Useful for complex reasoning tasks.
max_thinking_tokens: Maximum tokens for thinking block. Default is 512. Only applies when allow_thinking=True. Limits the reasoning output to prevent excessive token usage.
validation_retries: Number of automatic retries on SchemaValidationError. Default is 0. When using response_format with Pydantic validators, the grammar ensures syntactically valid JSON but cannot enforce semantic constraints (e.g., field_validator that requires age < 120).
When > 0, if parsing raises SchemaValidationError, the Chat automatically:
1. Appends the error message to conversation history
2. Regenerates with the same grammar constraint
3. Repeats until validation passes or retries exhausted
This closes the loop between grammar (syntactic) and Pydantic (semantic).
Set to 1-3 for most use cases; higher values rarely help.
extra_body: Extra parameters for remote API requests. Default is None. Dictionary of provider-specific parameters that are merged into the request body for OpenAI-compatible APIs. This is the "escape hatch" for using new or provider-specific features not yet in GenerationConfig.
For local inference, this parameter is ignored.
Example: extra_body={"repetition_penalty": 1.1, "top_a": 0.5}
Example
>>> # Creative writing config
>>> creative = GenerationConfig(
... temperature=1.2,
... top_p=0.95,
... max_tokens=500
... )
>>> # Precise/deterministic config
>>> precise = GenerationConfig(
... temperature=0.0,
... max_tokens=100
... )
>>> # JSON extraction config
>>> json_config = GenerationConfig(
... temperature=0.0,
... stop_sequences=["}"],
... max_tokens=200
... )
>>> # With logprobs
>>> config = GenerationConfig(
... logprobs=True,
... top_logprobs=5,
... max_tokens=50
... )
>>> # With logit bias (ban specific tokens)
>>> config = GenerationConfig(
... logit_bias={1234: -100, 5678: -100}, # Ban tokens 1234 and 5678
... max_tokens=100
... )
>>> # Use with Chat
>>> chat = Chat("model", config=precise)
>>> chat.send("What is 2+2?") # Uses precise config
>>> chat.send("Write a poem", config=creative) # Override for this call
Quick Reference
Methods
| Method | Description |
|---|---|
override() |
Create a new config with specified fields overr... |
to_dict() |
Return a JSON-serializable dict of config fields. |
Methods
def override(self, kwargs: object) → GenerationConfig
Create a new config with specified fields overridden.
Returns a new GenerationConfig with the specified fields changed. The original config is unchanged.
For in-place modification, assign directly: config.temperature = 0.5
Parameters
**kwargs: Fields to override. Must be valid GenerationConfig fields.
Returns
New GenerationConfig with the specified fields changed.
Example
>>> config = GenerationConfig(temperature=0.7, max_tokens=100)
>>> creative = config.override(temperature=1.2)
>>> creative.temperature
1.2
>>> config.temperature # Original unchanged
0.7
def to_dict(self) → dict
Return a JSON-serializable dict of config fields.
class Grammar
Grammar(self, response_format: type | dict[str, Any])
Pre-compiled grammar for structured output.
Compiling a JSON schema into a grammar can be expensive. This class allows you to compile once and reuse the grammar across multiple generations, eliminating the compilation overhead.
The grammar validates the schema at compile time - if the schema is invalid, a StructuredOutputError is raised immediately, not at generation time.
Parameters
response_formatA dataclass type or JSON schema dict defining the output structure.
Raises
StructuredOutputErrorIf the schema is invalid or compilation fails.
Example - Basic usage
>>> from dataclasses import dataclass
>>> from talu.router import Grammar, Chat
>>>
>>> @dataclass
... class Answer:
... value: int
... reasoning: str
...
>>> # Compile grammar once
>>> grammar = Grammar(Answer)
>>>
>>> # Reuse for multiple generations
>>> chat = Chat("Qwen/Qwen3-0.6B")
>>> r1 = chat.send("What is 2+2?", response_format=grammar)
>>> r2 = chat.send("What is 3+3?", response_format=grammar)
Example - Server context (compile at startup)
The Grammar class maintains a handle to native compiled grammar data. When the Grammar instance is garbage collected, the handle is freed automatically. For long-lived grammars (e.g., server context), keep a reference to prevent premature collection.
Quick Reference
Properties
| Name | Type |
|---|---|
response_format |
type | dict[str, Any] |
schema |
dict[str, Any] |
Methods
| Method | Description |
|---|---|
close() |
Free the native grammar handle. |
Properties
response_format: type | dict[str, Any]
The original response format passed to the constructor.
This is used to hydrate Response.parsed.
schema: dict[str, Any]
The compiled JSON schema as a dictionary.
Methods
def close(self) → None
Free the native grammar handle.
This method is idempotent - calling it multiple times is safe. After closing, the grammar cannot be used for generation.
class CompletionOptions
CompletionOptions(
self,
token_ids: list[int] | None = None,
continue_from_token_id: int | None = None,
echo_prompt: bool = False
)
self,
token_ids: list[int] | None = None,
continue_from_token_id: int | None = None,
echo_prompt: bool = False
)
Raw-completion-specific options.
These options don't make sense with chat-formatted prompts and are ONLY available via raw_complete().
Use this for technical use cases where you need low-level token control.
Attributes
token_idsPre-tokenized input. Bypasses tokenizer. Use case: External tokenization, token-level testing, sending tokens directly from another process. Does NOT make sense with role-based chat formatting (user/assistant markers).
continue_from_token_idContinue generation from a specific token ID. Use case: Autocomplete systems, prefix completion, debugging token alignment. Does NOT make sense with chat-formatted prompts that expect conversation turns.
echo_promptReturn input prompt plus generated completion combined. Use case: Auto-regressive training data generation, completion-style debugging where you need to verify input+output pairing. Chat applications already have full conversation history in
messageslist, so explicit echo doesn't make sense.
Example
>>> from talu.router import CompletionOptions
>>> # Pre-tokenized input
>>> opts = CompletionOptions(
... token_ids=[1234, 5678, 9012],
... continue_from_token_id=151645
... )
>>> response = talu.raw_complete(
... "Qwen/Qwen3-0.6B",
... "Continue: ",
... completion_opts=opts
... )
>>> # Echo mode
>>> opts = CompletionOptions(echo_prompt=True)
>>> response = talu.raw_complete(
... "Qwen/Qwen3-0.6B",
... "Hello",
... completion_opts=opts
... )
class StopFlag
StopFlag(self)
Thread-safe stop flag for cancelling generation.
This class wraps a ctypes bool that can be passed to the Zig core for cooperative cancellation. When signal() is called, the Zig generation loop will detect the flag on its next iteration and stop gracefully.
Usage
stop_flag = StopFlag()
# In generation thread or async task
for token in router.stream(chat, msg, stop_flag=stop_flag):
yield token
# In cancellation handler (e.g., asyncio.CancelledError)
stop_flag.signal()
The stop flag can be reused after calling reset().
Quick Reference
Properties
| Name | Type |
|---|---|
ptr |
int |
Methods
| Method | Description |
|---|---|
is_set() |
Check if the stop flag has been signalled. |
reset() |
Reset the flag to False for reuse. |
signal() |
Signal cancellation (set flag to True). |
Properties
ptr: int
Get the pointer address for passing to C API.
Returns an integer that can be cast to c_void_p.
Methods
def is_set(self) → bool
Check if the stop flag has been signalled.
def reset(self) → None
Reset the flag to False for reuse.
def signal(self) → None
Signal cancellation (set flag to True).
This is thread-safe - can be called from any thread.
class BackendSpec
BackendSpec(
self,
args,
kwargs
)
self,
args,
kwargs
)
Protocol for backend configuration specifications.
class LocalBackend
LocalBackend(
self,
gpu_layers: int = -1,
use_mmap: bool = True,
num_threads: int = 0
)
self,
gpu_layers: int = -1,
use_mmap: bool = True,
num_threads: int = 0
)
Configuration for local inference backend.
class OpenAICompatibleBackend
OpenAICompatibleBackend(
self,
base_url: str | None = None,
api_key: str | None = None,
org_id: str | None = None,
timeout_ms: int = 0,
max_retries: int = 0,
extra_params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None
)
self,
base_url: str | None = None,
api_key: str | None = None,
org_id: str | None = None,
timeout_ms: int = 0,
max_retries: int = 0,
extra_params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None
)
Configuration for OpenAI-compatible API backend.
Attributes
base_urlBase URL for the API (e.g., "http://localhost:8080/v1").
api_keyAPI key for authentication.
org_idOrganization ID for multi-tenant APIs.
timeout_msRequest timeout in milliseconds (0 = default).
max_retriesMaximum retry attempts for failed requests (0 = default).
extra_paramsAdditional parameters to include in API requests. These are merged into the request body for OpenAI-compatible APIs. Useful for provider-specific parameters not covered by GenerationConfig.
Example::backend = OpenAICompatibleBackend( base_url="https://api.together.xyz/v1", api_key="...", extra_params={"repetition_penalty": 1.1} )
headersCustom HTTP headers to include in every request to this backend. Useful for enterprise networking requirements like authentication proxies, custom routing, or tracing headers.
Example::backend = OpenAICompatibleBackend( base_url="https://internal-proxy.corp.example.com/llm/v1", api_key="...", headers={ "X-Request-ID": "abc123", "X-Team-ID": "ml-team", "X-Proxy-Auth": "secret-token", } )
class ModelSpec
ModelSpec(
self,
ref: str,
backend: BackendSpec | None = None
)
self,
ref: str,
backend: BackendSpec | None = None
)
Specification for a model with optional backend configuration.
class Capabilities
Capabilities(
self,
streaming: bool,
tool_calling: bool,
logprobs: bool,
embeddings: bool,
json_schema: bool
)
self,
streaming: bool,
tool_calling: bool,
logprobs: bool,
embeddings: bool,
json_schema: bool
)
Capability flags for a model backend.
class RemoteModelInfo
RemoteModelInfo(
self,
id: str,
object: str = 'model',
created: int | None = None,
owned_by: str = ''
)
self,
id: str,
object: str = 'model',
created: int | None = None,
owned_by: str = ''
)
Information about a model from a remote endpoint.
Attributes
idModel identifier (e.g., "Qwen/Qwen3-4B-Instruct-2507").
objectObject type (usually "model").
createdUnix timestamp when the model was created (optional).
owned_byOwner/organization of the model.
def list_endpoint_models
talu.router.list_endpoint_models(
base_url: str,
api_key: str | None = None,
timeout: float = 10.0
) → list[RemoteModelInfo]
base_url: str,
api_key: str | None = None,
timeout: float = 10.0
) → list[RemoteModelInfo]
List available models from an OpenAI-compatible endpoint.
This function queries the /v1/models endpoint to discover what models are available on a remote server.
Parameters
base_urlBase URL of the server (e.g., "http://localhost:8000").
api_keyOptional API key for authentication.
timeoutRequest timeout in seconds.
Returns
List of RemoteModelInfo objects describing available models.
Raises
IOErrorIf the server is not reachable.
ValidationErrorIf the response is invalid.
Example
>>> from talu.router import list_endpoint_models
>>> models = list_endpoint_models("http://localhost:8000")
>>> for m in models:
... print(m.id)
Qwen/Qwen3-4B-Instruct-2507
def check_endpoint
talu.router.check_endpoint(
base_url: str,
api_key: str | None = None,
timeout: float = 5.0
) → bool
base_url: str,
api_key: str | None = None,
timeout: float = 5.0
) → bool
Check if an OpenAI-compatible endpoint is available.
This is a lightweight check that attempts to connect to the /v1/models endpoint to verify the server is running.
Parameters
base_urlBase URL of the server (e.g., "http://localhost:8000").
api_keyOptional API key for authentication.
timeoutRequest timeout in seconds.
Returns
True if the endpoint is available, False otherwise.
Example
>>> from talu.router import check_endpoint
>>> if check_endpoint("http://localhost:8000"):
... print("vLLM is running")
def get_model_ids
talu.router.get_model_ids(
base_url: str,
api_key: str | None = None,
timeout: float = 10.0
) → list[str]
base_url: str,
api_key: str | None = None,
timeout: float = 10.0
) → list[str]
Get just the model IDs from a remote endpoint.
Convenience function that returns only the model ID strings.
Parameters
base_urlBase URL of the server (e.g., "http://localhost:8000").
api_keyOptional API key for authentication.
timeoutRequest timeout in seconds.
Returns
List of model ID strings.
Example
>>> from talu.router import get_model_ids
>>> ids = get_model_ids("http://localhost:8000")
>>> "Qwen/Qwen3-4B-Instruct-2507" in ids
True
SchemaStrategy
SchemaStrategy = "auto" | "typescript" | "json_schema" | "xml_schema"