Db

Storage and persistence for conversations and vectors

Classes

Class Description
VectorBatch Batch of vectors loaded from storage.
SearchResult Result from a single-query vector search.
SearchBatchResult Result from a multi-query vector search.
DocumentRecord Complete document with metadata and content.
DocumentSummary Lightweight document summary for list results.
DocumentSearchResult Search result with snippet.
ChangeRecord Document change event for CDC streams.
ChangeAction Change action type for CDC.
CompactionStats Compaction statistics for the document store.
Database Built-in Zig-backed storage.
VectorStore High-throughput embedding storage using TaluDB.
DocumentStore Document storage and management using TaluDB.

class VectorBatch

VectorBatch(
    self,
    ids: array[int],
    vectors: array[float],
    count: int,
    dims: int
)

Batch of vectors loaded from storage.

Attributes
ids

array("Q", ...) of u64 vector IDs.

vectors

array("f", ...) of float32 vectors, flattened.

count

Number of vectors in the batch.

dims

Vector dimensionality.

Example
>>> batch = store.load()
>>> print(f"Loaded {batch.count} vectors of dim {batch.dims}")
>>> for i in range(batch.count):
...     vec_id = batch.ids[i]
...     vec = batch.vectors[i * batch.dims : (i + 1) * batch.dims]

class SearchResult

SearchResult(
    self,
    ids: array[int],
    scores: array[float]
)

Result from a single-query vector search.

Attributes
ids

array("Q", ...) of matching vector IDs.

scores

array("f", ...) of similarity scores (dot product).

Example
>>> result = store.search(query, k=5)
>>> for vec_id, score in zip(result.ids, result.scores):
...     print(f"ID {vec_id}: score {score:.4f}")

class SearchBatchResult

SearchBatchResult(
    self,
    ids: array[int],
    scores: array[float],
    count_per_query: int
)

Result from a multi-query vector search.

Attributes
ids

array("Q", ...) of matching vector IDs (query_count * count_per_query).

scores

array("f", ...) of similarity scores (query_count * count_per_query).

count_per_query

Number of results per query.

Example
>>> result = store.search_batch(queries, dims=3, query_count=2, k=5)
>>> for q in range(2):
...     offset = q * result.count_per_query
...     for i in range(result.count_per_query):
...         print(f"Query {q}: ID {result.ids[offset+i]}, score {result.scores[offset+i]:.4f}")

class DocumentRecord

DocumentRecord(
    self,
    doc_id: str,
    doc_type: str,
    title: str,
    doc_json: str,
    tags_text: str | None = None,
    parent_id: str | None = None,
    marker: str | None = None,
    group_id: str | None = None,
    owner_id: str | None = None,
    created_at_ms: int = 0,
    updated_at_ms: int = 0,
    expires_at_ms: int = 0,
    content_hash: int = 0,
    seq_num: int = 0
)

Complete document with metadata and content.


class DocumentSummary

DocumentSummary(
    self,
    doc_id: str,
    doc_type: str,
    title: str,
    marker: str | None = None,
    created_at_ms: int = 0,
    updated_at_ms: int = 0
)

Lightweight document summary for list results.


class DocumentSearchResult

DocumentSearchResult(
    self,
    doc_id: str,
    doc_type: str,
    title: str,
    snippet: str
)

Search result with snippet.


class ChangeRecord

ChangeRecord(
    self,
    seq_num: int,
    doc_id: str,
    action: ChangeAction,
    timestamp_ms: int,
    doc_type: str | None = None,
    title: str | None = None
)

Document change event for CDC streams.


class ChangeAction

Change action type for CDC.


class CompactionStats

CompactionStats(
    self,
    total_documents: int,
    active_documents: int,
    expired_documents: int,
    deleted_documents: int,
    tombstone_count: int,
    delta_versions: int,
    estimated_garbage_bytes: int
)

Compaction statistics for the document store.


class Database

Database(self, location: str = ':memory:')

Built-in Zig-backed storage.

Items are stored in Zig memory by default. TaluDB persistence is available via the talu:// scheme.

Parameters
location
  • Storage location. Supported:
  • ":memory:" (default, in-memory only)
  • "talu://<path>" (TaluDB on-disk persistence)
Example
>>> # Default in-memory storage
>>> chat = talu.Chat("model", storage=Database())
>>>
>>> # Explicit in-memory (same as default)
>>> chat = talu.Chat("model", storage=Database(":memory:"))
>>>
>>> # TaluDB persistence
>>> chat = talu.Chat("model", storage=Database("talu://./my-db"))
Note

Database is the default when no storage is specified. All item operations go through Zig for maximum performance.

Quick Reference

Properties

Name Type
location str

Methods

Method Description
list_sessions() List sessions with advanced filtering.
list_sessions_by_source() List sessions derived from a specific prompt do...

Properties

location: str

The storage location string.

Methods

def list_sessions(
    self,
    limit: int = 50,
    before_updated_at_ms: int = 0,
    before_session_id: str | None = None,
    group_id: str | None = None,
    search_query: str | None = None,
    tags_filter: str | None = None,
    tags_filter_any: str | None = None,
    marker_filter: str | None = None,
    marker_filter_any: str | None = None,
    model_filter: str | None = None,
    created_after_ms: int = 0,
    created_before_ms: int = 0,
    updated_after_ms: int = 0,
    updated_before_ms: int = 0,
    has_tags: bool | None = None,
    source_doc_id: str | None = None
)list[SessionRecord]

List sessions with advanced filtering.

Parameters
limit

Maximum number of sessions to return (default 50).

before_updated_at_ms

Cursor for pagination (0 = start from newest).

before_session_id

Session ID cursor for pagination.

group_id

Filter by group ID.

search_query

Full-text search in title/content.

tags_filter

Require ALL these tags (comma-separated).

tags_filter_any

Require ANY of these tags (comma-separated).

marker_filter

Require ALL these markers (comma-separated).

marker_filter_any

Require ANY of these markers (comma-separated).

model_filter

Filter by model name.

created_after_ms

Filter by creation time (inclusive).

created_before_ms

Filter by creation time (exclusive).

updated_after_ms

Filter by update time (inclusive).

updated_before_ms

Filter by update time (exclusive).

has_tags

Filter by presence of tags (True/False/None).

source_doc_id

Filter by source document ID.

Returns

List of session records ordered by updated_at descending.

Raises
ValidationError

If storage is not TaluDB.

StateError

If the query fails.

Example
>>> db = Database("talu://./my-db")
>>> # Find recent sessions with a specific tag
>>> sessions = db.list_sessions(tags_filter="project:acme", limit=10)
>>> # Search for sessions containing "bug fix"
>>> sessions = db.list_sessions(search_query="bug fix")

def list_sessions_by_source(
    self,
    source_doc_id: str,
    limit: int = 50,
    before_updated_at_ms: int = 0,
    before_session_id: str | None = None
)list[SessionRecord]

List sessions derived from a specific prompt document.

This is a convenience method for lineage queries - finding all conversations that were spawned from a particular prompt document.

Parameters
source_doc_id

Document ID to filter by.

limit

Maximum number of sessions to return (default 50).

before_updated_at_ms

Cursor for pagination (0 = start from newest).

before_session_id

Session ID cursor for pagination.

Returns

List of session records ordered by updated_at descending.

Raises
ValidationError

If storage is not TaluDB.

StateError

If the query fails.

Example
>>> db = Database("talu://./my-db")
>>> sessions = db.list_sessions_by_source("doc_abc123")
>>> for s in sessions:
...     print(s["session_id"], s["title"])

class VectorStore

VectorStore(self, location: str | Path)

High-throughput embedding storage using TaluDB.

VectorStore provides efficient storage and similarity search for embedding vectors. It wraps the Zig-based TaluDB vector backend, offering crash-safe persistence and fast dot-product search.

Parameters
location

Path to the TaluDB vector database folder. If the folder doesn't exist, it will be created.

Example
>>> from array import array
>>> from talu.db import VectorStore
>>>
>>> # Create/open a vector store
>>> store = VectorStore("./my-vectors")
>>>
>>> # Append vectors (Structure-of-Arrays format)
>>> ids = array("Q", [1, 2, 3])  # u64 IDs
>>> vectors = array("f", [
...     1.0, 0.0, 0.0,  # vector 1
...     0.0, 1.0, 0.0,  # vector 2
...     0.0, 0.0, 1.0,  # vector 3
... ])
>>> store.append_batch(ids, vectors, dims=3)
>>>
>>> # Search for similar vectors
>>> query = array("f", [1.0, 0.0, 0.0])
>>> ids, scores = store.search(query, k=2)
>>> print(list(zip(ids, scores)))
[(1, 1.0), (2, 0.0)]
>>>
>>> # Always close when done
>>> store.close()
Note
VectorStore uses Structure-of-Arrays format for efficiency
  • ids: array("Q", ...) - u64 vector IDs
  • vectors: array("f", ...) - float32 vectors, flattened
  • dims: vector dimensionality

For 3 vectors of dim=2: vectors = [v0x, v0y, v1x, v1y, v2x, v2y]

Quick Reference

Methods

Method Description
append_batch() Append a batch of vectors to the store.
close() Close the vector store and release resources.
load() Load all vectors from the store.
scan() Stream scores for all vectors without top-k sor...
scan_batch() Scan scores for multiple queries into flat buff...
search() Search for the top-k most similar vectors.
search_batch() Search for the top-k most similar vectors for m...
set_durability() Set the write durability mode.

Methods

def append_batch(
    self,
    ids: array[int],
    vectors: array[float],
    dims: int
)None

Append a batch of vectors to the store.

Parameters
ids

array("Q", ...) of u64 vector IDs.

vectors

array("f", ...) of float32 vectors, flattened.

dims

Vector dimensionality.

Raises
StateError

If the store is closed.

ValidationError

If inputs are invalid.

IOError

If the append operation fails.

Example
>>> ids = array("Q", [1, 2])
>>> vectors = array("f", [1.0, 0.0, 0.0, 1.0])  # 2 vectors of dim=2
>>> store.append_batch(ids, vectors, dims=2)

def close(self)None

Close the vector store and release resources.

This method is idempotent - calling it multiple times is safe.

def load(self)VectorBatch

Load all vectors from the store.

Returns

VectorBatch with ids, vectors, count, and dims.

Raises
StateError

If the store is closed.

IOError

If the load operation fails.

def scan(self, query: array[float])Iterator[tuple[int, float]]

Stream scores for all vectors without top-k sorting.

This is useful for applying custom thresholds or filters in Python.

Parameters
query

array("f", ...) query vector.

Yields

Tuples of (id, score) for each vector in the store.

Example
>>> query = array("f", [1.0, 0.0, 0.0])
>>> for vec_id, score in store.scan(query):
...     if score > 0.5:
...         print(f"ID {vec_id}: {score}")

def scan_batch(
    self,
    queries: array[float],
    dims: int,
    query_count: int
)tuple[array[int], array[float], int]

Scan scores for multiple queries into flat buffers.

Parameters
queries

array("f", ...) of query vectors, flattened.

dims

Vector dimensionality.

query_count

Number of queries.

Returns
  • Tuple of (ids, scores, total_rows).
  • ids: array("Q", ...) of all vector IDs
  • scores: array("f", ...) of all scores (query_count * total_rows)
  • total_rows: Number of vectors in the store
Raises
StateError

If the store is closed.

IOError

If the scan operation fails.

Note

Scores are laid out as [query][row]. For query i, row j: scores[i * total_rows + j]

def search_batch(
    self,
    queries: array[float],
    dims: int,
    query_count: int,
    k: int
)tuple[array[int], array[float], int]

Search for the top-k most similar vectors for multiple queries.

Parameters
queries

array("f", ...) of query vectors, flattened.

dims

Vector dimensionality.

query_count

Number of queries.

k

Number of results per query.

Returns
  • Tuple of (ids, scores, count_per_query).
  • ids: array("Q", ...) of matching vector IDs (query_count * k)
  • scores: array("f", ...) of similarity scores (query_count * k)
  • count_per_query: Actual results per query (may be < k if fewer vectors)
Raises
StateError

If the store is closed.

IOError

If the search operation fails.

Example
>>> queries = array("f", [1.0, 0.0, 0.0, 0.0, 1.0, 0.0])  # 2 queries
>>> ids, scores, count = store.search_batch(queries, dims=3, query_count=2, k=2)

def set_durability(self, mode: str)None

Set the write durability mode.

Parameters
mode

"full" (fsync per write) or "async_os" (OS-buffered).

Raises
ValueError

If mode is not a recognised durability string.

StateError

If the store is closed.


class DocumentStore

DocumentStore(self, location: str | Path)

Document storage and management using TaluDB.

DocumentStore provides persistent storage for documents with support for CRUD operations, search, tagging, TTL, versioning, and CDC.

Parameters
location

Path to the TaluDB database folder.

Example
>>> from talu.db import DocumentStore
>>>
>>> with DocumentStore("./my-docs") as store:
...     store.create(
...         doc_id="doc-123",
...         doc_type="prompt",
...         title="My Prompt",
...         doc_json='{"content": "Hello world"}',
...         tags_text="coding review",
...     )
...
...     doc = store.get("doc-123")
...     print(doc.title)
'My Prompt'
Note

Always close the store when done, or use it as a context manager.

Quick Reference

Methods

Method Description
add_tag() Add a tag to a document.
close() Close the document store.
count_expired() Count expired documents.
create() Create a new document.
create_delta() Create a delta version of a document.
delete() Delete a document.
get() Get a document by ID.
get_base_id() Get the base document ID for a delta.
get_by_tag() Get document IDs by tag.
get_changes() Get changes since a sequence number (CDC).
get_compaction_stats() Get compaction statistics.
get_delta_chain() Get the delta chain for a document.
get_garbage_candidates() Get document IDs that are candidates for garbag...
get_tags() Get tags for a document.
is_delta() Check if a document is a delta version.
list() List documents with optional filters.
purge_expired() Purge expired documents.
remove_tag() Remove a tag from a document.
search() Search documents by content.
search_batch() Batch search documents with multiple queries.
set_ttl() Set TTL for a document.
update() Update an existing document.

Methods

def add_tag(
    self,
    doc_id: str,
    tag_id: str,
    group_id: str | None = None
)None

Add a tag to a document.

Parameters
doc_id

Document identifier.

tag_id

Tag identifier.

group_id

Group ID for multi-tenant isolation (optional).

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def close(self)None

Close the document store.

This method is idempotent - calling it multiple times is safe.

def count_expired(self)int

Count expired documents.

Returns

Number of expired documents.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def create(
    self,
    doc_id: str,
    doc_type: str,
    title: str,
    doc_json: str,
    tags_text: str | None = None,
    parent_id: str | None = None,
    marker: str | None = None,
    group_id: str | None = None,
    owner_id: str | None = None
)None

Create a new document.

Parameters
doc_id

Unique document identifier.

doc_type

Document type (e.g., "prompt", "persona", "rag").

title

Human-readable title.

doc_json

JSON content payload.

tags_text

Space-separated tags for search (optional).

parent_id

Parent document ID for versioning (optional).

marker

Lifecycle marker like "active", "archived" (optional).

group_id

Group ID for multi-tenant isolation (optional).

owner_id

Owner ID for "My Docs" filtering (optional).

Raises
StateError

If the store is closed.

StorageError

If the document cannot be created.

ValidationError

If arguments are invalid.

def create_delta(
    self,
    base_doc_id: str,
    new_doc_id: str,
    delta_json: str,
    title: str | None = None,
    tags_text: str | None = None,
    marker: str | None = None
)None

Create a delta version of a document.

Delta versions store only the changes from a base document, saving storage space for frequently edited documents.

Parameters
base_doc_id

ID of the base document.

new_doc_id

ID for the new delta document.

delta_json

JSON patch/delta content.

title

Title for the delta (optional, inherits from base).

tags_text

Tags for the delta (optional).

marker

Marker for the delta (optional).

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def delete(self, doc_id: str)None

Delete a document.

Parameters
doc_id

Document identifier.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def get(self, doc_id: str)DocumentRecord | None

Get a document by ID.

Parameters
doc_id

Document identifier.

Returns

DocumentRecord if found, None otherwise.

Raises
StateError

If the store is closed.

StorageError

If the operation fails (other than not found).

def get_base_id(self, doc_id: str)str | None

Get the base document ID for a delta.

Parameters
doc_id

Document identifier.

Returns

Base document ID, or None if not a delta.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def get_by_tag(self, tag_id: str)list[str]

Get document IDs by tag.

Parameters
tag_id

Tag identifier.

Returns

List of document IDs.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def get_changes(
    self,
    since_seq: int = 0,
    group_id: str | None = None,
    limit: int = 100
)list[ChangeRecord]

Get changes since a sequence number (CDC).

Parameters
since_seq

Sequence number to start from (exclusive).

group_id

Filter by group ID (optional).

limit

Maximum number of changes.

Returns

List of ChangeRecord objects.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def get_compaction_stats(self)CompactionStats

Get compaction statistics.

Returns

CompactionStats with document counts and garbage estimates.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def get_delta_chain(self, doc_id: str)list[DocumentRecord]

Get the delta chain for a document.

Returns documents in order from the requested document back to the base. The first element is the requested document, the last is the base (full version).

Parameters
doc_id

Document identifier.

Returns

List of DocumentRecord objects forming the delta chain. For non-delta documents, returns a single-element list.

Raises
StateError

If the store is closed.

StorageError

If the document doesn't exist or operation fails.

Example
>>> # Get delta chain for a versioned document
>>> chain = store.get_delta_chain("doc-v3")
>>> print(f"Chain length: {len(chain)}")
Chain length: 3
>>> print(f"Base doc: {chain[-1].doc_id}")
Base doc: doc-v1

def get_garbage_candidates(self)list[str]

Get document IDs that are candidates for garbage collection.

Returns

List of document IDs.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def get_tags(self, doc_id: str)list[str]

Get tags for a document.

Parameters
doc_id

Document identifier.

Returns

List of tag IDs.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def is_delta(self, doc_id: str)bool

Check if a document is a delta version.

Parameters
doc_id

Document identifier.

Returns

True if the document is a delta version.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def list(
    self,
    doc_type: str | None = None,
    group_id: str | None = None,
    owner_id: str | None = None,
    marker: str | None = None,
    limit: int = 100
)list[DocumentSummary]

List documents with optional filters.

Parameters
doc_type

Filter by document type (optional).

group_id

Filter by group ID (optional).

owner_id

Filter by owner ID (optional).

marker

Filter by marker (optional).

limit

Maximum number of results.

Returns

List of DocumentSummary objects.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def purge_expired(self)int

Purge expired documents.

Returns

Number of documents purged.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def remove_tag(
    self,
    doc_id: str,
    tag_id: str,
    group_id: str | None = None
)None

Remove a tag from a document.

Parameters
doc_id

Document identifier.

tag_id

Tag identifier.

group_id

Group ID for multi-tenant isolation (optional).

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def search_batch(self, queries: list[dict[str, str]])dict[str, list[str]]

Batch search documents with multiple queries.

This is more efficient than multiple `search()` calls when you have many queries to execute.

Parameters
queries
  • List of query dicts, each with:
  • "id": Query identifier (for result mapping)
  • "text": Search query text
  • "type": Optional document type filter
Returns

Dict mapping query IDs to lists of matching document IDs.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

ValidationError

If queries format is invalid.

Example
>>> queries = [
...     {"id": "q1", "text": "coding", "type": "prompt"},
...     {"id": "q2", "text": "review"},
... ]
>>> results = store.search_batch(queries)
>>> print(results)
{"q1": ["doc-a", "doc-b"], "q2": ["doc-c"]}

def set_ttl(
    self,
    doc_id: str,
    ttl_seconds: int
)None

Set TTL for a document.

Parameters
doc_id

Document identifier.

ttl_seconds

Time-to-live in seconds. 0 = never expires.

Raises
StateError

If the store is closed.

StorageError

If the operation fails.

def update(
    self,
    doc_id: str,
    title: str | None = None,
    doc_json: str | None = None,
    tags_text: str | None = None,
    marker: str | None = None
)None

Update an existing document.

Only provided fields are updated; others are left unchanged.

Parameters
doc_id

Document identifier.

title

New title (optional).

doc_json

New JSON content (optional).

tags_text

New tags (optional).

marker

New marker (optional).

Raises
StateError

If the store is closed.

StorageError

If the document doesn't exist or update fails.