SimpleVecDB provides async wrappers for use in async/await contexts. These are thin wrappers around the synchronous API using ThreadPoolExecutor.

Quick Start

import asyncio
from simplevecdb import AsyncVectorDB

async def main():
    db = AsyncVectorDB("vectors.db")
    collection = db.collection("docs")

    # Add documents asynchronously
    ids = await collection.add_texts(
        ["Hello world", "Async is great"],
        embeddings=[[0.1] * 384, [0.2] * 384]
    )

    # Search asynchronously
    results = await collection.similarity_search([0.1] * 384, k=5)
    return results

results = asyncio.run(main())

Configuration

The async wrappers use a ThreadPoolExecutor for concurrent operations. You can configure the number of workers:

# Default: 4 workers
db = AsyncVectorDB("vectors.db")

# Custom worker count
db = AsyncVectorDB("vectors.db", max_workers=8)

Available Methods

AsyncVectorCollection provides async versions of all search and modification methods:

Sync Method Async Method
add_texts() await collection.add_texts()
similarity_search() await collection.similarity_search()
similarity_search_batch() await collection.similarity_search_batch()
keyword_search() await collection.keyword_search()
hybrid_search() await collection.hybrid_search()
max_marginal_relevance_search() await collection.max_marginal_relevance_search()
delete_by_ids() await collection.delete_by_ids()
remove_texts() await collection.remove_texts()

Synchronous properties remain unchanged:

  • collection.name - Collection name

Concurrent Operations

Run multiple searches in parallel with asyncio.gather or use batch search for better performance:

async def concurrent_search():
    db = AsyncVectorDB("vectors.db")
    collection = db.collection("docs")

    queries = [[0.1] * 384, [0.2] * 384, [0.3] * 384]

    # Option 1: Batch search (recommended, ~10x faster)
    results = await collection.similarity_search_batch(queries, k=5)

    # Option 2: Concurrent individual searches
    results = await asyncio.gather(*[
        collection.similarity_search(q, k=5)
        for q in queries
    ])
    return results

When to Use

Use Async API when:

  • Building async web servers (FastAPI, aiohttp)
  • Running concurrent searches
  • Integrating with async frameworks

Use Sync API when:

  • Simple scripts and notebooks
  • Single-threaded applications
  • Maximum simplicity is needed

API Reference

simplevecdb.async_core.AsyncVectorDB

Async wrapper for VectorDB.

Creates a thread pool executor for running synchronous SQLite operations without blocking the async event loop.

Example

async def main(): ... db = AsyncVectorDB("my_vectors.db") ... collection = db.collection("documents") ... await collection.add_texts(["hello"], embeddings=[[0.1]384]) ... results = await collection.similarity_search([0.1]384) ... await db.close()

Parameters:

Name Type Description Default
path str

Path to SQLite database file. Use ":memory:" for in-memory DB.

':memory:'
distance_strategy DistanceStrategy

Distance metric (COSINE, L2, or L1).

COSINE
quantization Quantization

Vector quantization (FLOAT, INT8, or BIT).

FLOAT
max_workers int

Number of threads in executor pool. Default 4.

4
**kwargs Any

Additional arguments passed to VectorDB.

{}
Source code in src/simplevecdb/async_core.py
class AsyncVectorDB:
    """
    Async wrapper for VectorDB.

    Creates a thread pool executor for running synchronous SQLite operations
    without blocking the async event loop.

    Example:
        >>> async def main():
        ...     db = AsyncVectorDB("my_vectors.db")
        ...     collection = db.collection("documents")
        ...     await collection.add_texts(["hello"], embeddings=[[0.1]*384])
        ...     results = await collection.similarity_search([0.1]*384)
        ...     await db.close()

    Args:
        path: Path to SQLite database file. Use ":memory:" for in-memory DB.
        distance_strategy: Distance metric (COSINE, L2, or L1).
        quantization: Vector quantization (FLOAT, INT8, or BIT).
        max_workers: Number of threads in executor pool. Default 4.
        **kwargs: Additional arguments passed to VectorDB.
    """

    def __init__(
        self,
        path: str = ":memory:",
        distance_strategy: DistanceStrategy = DistanceStrategy.COSINE,
        quantization: Quantization = Quantization.FLOAT,
        max_workers: int = 4,
        **kwargs: Any,
    ):
        self._db = VectorDB(
            path=path,
            distance_strategy=distance_strategy,
            quantization=quantization,
            **kwargs,
        )
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._collections: dict[tuple, AsyncVectorCollection] = {}
        self._collections_lock = Lock()  # Thread-safe collection caching

    def collection(
        self,
        name: str = "default",
        distance_strategy: DistanceStrategy | None = None,
        quantization: Quantization | None = None,
    ) -> AsyncVectorCollection:
        """
        Get or create a named vector collection.

        Args:
            name: Collection name (alphanumeric + underscore only).
            distance_strategy: Override database-level distance metric.
            quantization: Override database-level quantization.

        Returns:
            AsyncVectorCollection instance.
        """
        cache_key = (name, distance_strategy, quantization)
        with self._collections_lock:
            if cache_key not in self._collections:
                sync_collection = self._db.collection(
                    name,
                    distance_strategy=distance_strategy,
                    quantization=quantization,
                )
                self._collections[cache_key] = AsyncVectorCollection(
                    sync_collection, self._executor
                )
            return self._collections[cache_key]

    def list_collections(self) -> list[str]:
        """Return names of all initialized collections."""
        return self._db.list_collections()

    async def search_collections(
        self,
        query: Sequence[float],
        collections: list[str] | None = None,
        k: int = 10,
        filter: dict[str, Any] | None = None,
        *,
        normalize_scores: bool = True,
        parallel: bool = True,
    ) -> list[tuple[Document, float, str]]:
        """
        Search across multiple collections with merged, ranked results.

        See VectorDB.search_collections for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._db.search_collections(
                query,
                collections,
                k,
                filter,
                normalize_scores=normalize_scores,
                parallel=parallel,
            ),
        )

    async def vacuum(self, checkpoint_wal: bool = True) -> None:
        """
        Reclaim disk space by rebuilding the database file.

        Async wrapper for VectorDB.vacuum(). See sync version for details.

        Args:
            checkpoint_wal: If True (default), also truncate the WAL file.
        """
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(
            self._executor, lambda: self._db.vacuum(checkpoint_wal)
        )

    async def close(self) -> None:
        """Close the database connection and shutdown executor."""
        # Run shutdown in executor to avoid blocking event loop
        loop = asyncio.get_running_loop()
        try:
            await loop.run_in_executor(None, self._executor.shutdown, True)
        finally:
            self._db.close()

    async def __aenter__(self) -> "AsyncVectorDB":
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

collection(name='default', distance_strategy=None, quantization=None)

Get or create a named vector collection.

Parameters:

Name Type Description Default
name str

Collection name (alphanumeric + underscore only).

'default'
distance_strategy DistanceStrategy | None

Override database-level distance metric.

None
quantization Quantization | None

Override database-level quantization.

None

Returns:

Type Description
AsyncVectorCollection

AsyncVectorCollection instance.

Source code in src/simplevecdb/async_core.py
def collection(
    self,
    name: str = "default",
    distance_strategy: DistanceStrategy | None = None,
    quantization: Quantization | None = None,
) -> AsyncVectorCollection:
    """
    Get or create a named vector collection.

    Args:
        name: Collection name (alphanumeric + underscore only).
        distance_strategy: Override database-level distance metric.
        quantization: Override database-level quantization.

    Returns:
        AsyncVectorCollection instance.
    """
    cache_key = (name, distance_strategy, quantization)
    with self._collections_lock:
        if cache_key not in self._collections:
            sync_collection = self._db.collection(
                name,
                distance_strategy=distance_strategy,
                quantization=quantization,
            )
            self._collections[cache_key] = AsyncVectorCollection(
                sync_collection, self._executor
            )
        return self._collections[cache_key]

list_collections()

Return names of all initialized collections.

Source code in src/simplevecdb/async_core.py
def list_collections(self) -> list[str]:
    """Return names of all initialized collections."""
    return self._db.list_collections()

search_collections(query, collections=None, k=10, filter=None, *, normalize_scores=True, parallel=True) async

Search across multiple collections with merged, ranked results.

See VectorDB.search_collections for full documentation.

Source code in src/simplevecdb/async_core.py
async def search_collections(
    self,
    query: Sequence[float],
    collections: list[str] | None = None,
    k: int = 10,
    filter: dict[str, Any] | None = None,
    *,
    normalize_scores: bool = True,
    parallel: bool = True,
) -> list[tuple[Document, float, str]]:
    """
    Search across multiple collections with merged, ranked results.

    See VectorDB.search_collections for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._db.search_collections(
            query,
            collections,
            k,
            filter,
            normalize_scores=normalize_scores,
            parallel=parallel,
        ),
    )

vacuum(checkpoint_wal=True) async

Reclaim disk space by rebuilding the database file.

Async wrapper for VectorDB.vacuum(). See sync version for details.

Parameters:

Name Type Description Default
checkpoint_wal bool

If True (default), also truncate the WAL file.

True
Source code in src/simplevecdb/async_core.py
async def vacuum(self, checkpoint_wal: bool = True) -> None:
    """
    Reclaim disk space by rebuilding the database file.

    Async wrapper for VectorDB.vacuum(). See sync version for details.

    Args:
        checkpoint_wal: If True (default), also truncate the WAL file.
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        self._executor, lambda: self._db.vacuum(checkpoint_wal)
    )

close() async

Close the database connection and shutdown executor.

Source code in src/simplevecdb/async_core.py
async def close(self) -> None:
    """Close the database connection and shutdown executor."""
    # Run shutdown in executor to avoid blocking event loop
    loop = asyncio.get_running_loop()
    try:
        await loop.run_in_executor(None, self._executor.shutdown, True)
    finally:
        self._db.close()

__aenter__() async

Async context manager entry.

Source code in src/simplevecdb/async_core.py
async def __aenter__(self) -> "AsyncVectorDB":
    """Async context manager entry."""
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in src/simplevecdb/async_core.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit."""
    await self.close()

simplevecdb.async_core.AsyncVectorCollection

Async wrapper for VectorCollection.

All methods are async versions of the synchronous VectorCollection methods, executed in a thread pool to avoid blocking the event loop.

Source code in src/simplevecdb/async_core.py
class AsyncVectorCollection:
    """
    Async wrapper for VectorCollection.

    All methods are async versions of the synchronous VectorCollection methods,
    executed in a thread pool to avoid blocking the event loop.
    """

    def __init__(
        self,
        sync_collection: VectorCollection,
        executor: ThreadPoolExecutor,
    ):
        self._collection = sync_collection
        self._executor = executor

    @property
    def name(self) -> str:
        """Collection name."""
        return self._collection.name

    async def add_texts(
        self,
        texts: Sequence[str],
        metadatas: Sequence[dict] | None = None,
        embeddings: Sequence[Sequence[float]] | None = None,
        ids: Sequence[int | None] | None = None,
    ) -> list[int]:
        """
        Add texts with optional embeddings and metadata.

        See VectorCollection.add_texts for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.add_texts(texts, metadatas, embeddings, ids),
        )

    async def similarity_search(
        self,
        query: str | Sequence[float],
        k: int = 5,
        filter: dict[str, Any] | None = None,
        *,
        exact: bool | None = None,
        threads: int = 0,
    ) -> list[tuple[Document, float]]:
        """
        Search for most similar vectors.

        See VectorCollection.similarity_search for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.similarity_search(
                query, k, filter, exact=exact, threads=threads
            ),
        )

    async def similarity_search_batch(
        self,
        queries: Sequence[Sequence[float]],
        k: int = 5,
        filter: dict[str, Any] | None = None,
        *,
        exact: bool | None = None,
        threads: int = 0,
    ) -> list[list[tuple[Document, float]]]:
        """
        Batch search for multiple query vectors.

        See VectorCollection.similarity_search_batch for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.similarity_search_batch(
                queries, k, filter, exact=exact, threads=threads
            ),
        )

    async def keyword_search(
        self,
        query: str,
        k: int = 5,
        filter: dict[str, Any] | None = None,
    ) -> list[tuple[Document, float]]:
        """
        Search using BM25 keyword ranking.

        See VectorCollection.keyword_search for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.keyword_search(query, k, filter),
        )

    async def hybrid_search(
        self,
        query: str,
        k: int = 5,
        filter: dict[str, Any] | None = None,
        *,
        query_vector: Sequence[float] | None = None,
        vector_k: int | None = None,
        keyword_k: int | None = None,
        rrf_k: int = 60,
    ) -> list[tuple[Document, float]]:
        """
        Combine keyword and vector search using Reciprocal Rank Fusion.

        See VectorCollection.hybrid_search for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.hybrid_search(
                query,
                k,
                filter,
                query_vector=query_vector,
                vector_k=vector_k,
                keyword_k=keyword_k,
                rrf_k=rrf_k,
            ),
        )

    async def max_marginal_relevance_search(
        self,
        query: str | Sequence[float],
        k: int = 5,
        fetch_k: int = 20,
        lambda_mult: float = 0.5,
        filter: dict[str, Any] | None = None,
    ) -> list[Document]:
        """
        Search with diversity using Max Marginal Relevance.

        See VectorCollection.max_marginal_relevance_search for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.max_marginal_relevance_search(
                query, k, fetch_k, lambda_mult, filter
            ),
        )

    async def delete_by_ids(self, ids: Sequence[int]) -> None:
        """
        Delete documents by their IDs.

        See VectorCollection.delete_by_ids for full documentation.
        """
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(
            self._executor,
            lambda: self._collection.delete_by_ids(ids),
        )

    async def remove_texts(
        self,
        texts: Sequence[str] | None = None,
        filter: dict[str, Any] | None = None,
    ) -> int:
        """
        Remove documents by text content or metadata filter.

        See VectorCollection.remove_texts for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.remove_texts(texts, filter),
        )

    # ─────────────────────────────────────────────────────────────────────────
    # Clustering Methods (Async)
    # ─────────────────────────────────────────────────────────────────────────

    async def cluster(
        self,
        n_clusters: int | None = None,
        algorithm: str = "minibatch_kmeans",
        *,
        filter: dict[str, Any] | None = None,
        sample_size: int | None = None,
        min_cluster_size: int = 5,
        random_state: int | None = None,
    ) -> Any:
        """
        Cluster documents by their embeddings (async).

        See VectorCollection.cluster for full documentation.
        """

        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.cluster(
                n_clusters,
                algorithm,  # type: ignore[arg-type]
                filter=filter,
                sample_size=sample_size,
                min_cluster_size=min_cluster_size,
                random_state=random_state,
            ),
        )

    async def auto_tag(
        self,
        cluster_result: Any,
        *,
        method: str = "keywords",
        n_keywords: int = 5,
        custom_callback: Any = None,
    ) -> dict[int, str]:
        """
        Generate descriptive tags for clusters (async).

        See VectorCollection.auto_tag for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.auto_tag(
                cluster_result,
                method=method,
                n_keywords=n_keywords,
                custom_callback=custom_callback,
            ),
        )

    async def assign_cluster_metadata(
        self,
        cluster_result: Any,
        tags: dict[int, str] | None = None,
        *,
        metadata_key: str = "cluster",
        tag_key: str = "cluster_tag",
    ) -> int:
        """
        Persist cluster assignments to metadata (async).

        See VectorCollection.assign_cluster_metadata for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.assign_cluster_metadata(
                cluster_result,
                tags,
                metadata_key=metadata_key,
                tag_key=tag_key,
            ),
        )

    async def get_cluster_members(
        self,
        cluster_id: int,
        *,
        metadata_key: str = "cluster",
    ) -> list[Document]:
        """
        Get all documents in a cluster (async).

        See VectorCollection.get_cluster_members for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.get_cluster_members(
                cluster_id, metadata_key=metadata_key
            ),
        )

    async def save_cluster(
        self,
        name: str,
        cluster_result: Any,
        *,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """
        Save cluster state for later assignment (async).

        See VectorCollection.save_cluster for full documentation.
        """
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(
            self._executor,
            lambda: self._collection.save_cluster(
                name, cluster_result, metadata=metadata
            ),
        )

    async def load_cluster(
        self,
        name: str,
    ) -> tuple[Any, dict[str, Any]] | None:
        """
        Load saved cluster state (async).

        See VectorCollection.load_cluster for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.load_cluster(name),
        )

    async def list_clusters(self) -> list[dict[str, Any]]:
        """
        List all saved cluster configurations (async).

        See VectorCollection.list_clusters for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            self._collection.list_clusters,
        )

    async def delete_cluster(self, name: str) -> bool:
        """
        Delete a saved cluster configuration (async).

        See VectorCollection.delete_cluster for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.delete_cluster(name),
        )

    async def assign_to_cluster(
        self,
        name: str,
        doc_ids: list[int],
        *,
        metadata_key: str = "cluster",
    ) -> int:
        """
        Assign documents to a saved cluster (async).

        See VectorCollection.assign_to_cluster for full documentation.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            lambda: self._collection.assign_to_cluster(
                name, doc_ids, metadata_key=metadata_key
            ),
        )

name property

Collection name.

add_texts(texts, metadatas=None, embeddings=None, ids=None) async

Add texts with optional embeddings and metadata.

See VectorCollection.add_texts for full documentation.

Source code in src/simplevecdb/async_core.py
async def add_texts(
    self,
    texts: Sequence[str],
    metadatas: Sequence[dict] | None = None,
    embeddings: Sequence[Sequence[float]] | None = None,
    ids: Sequence[int | None] | None = None,
) -> list[int]:
    """
    Add texts with optional embeddings and metadata.

    See VectorCollection.add_texts for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.add_texts(texts, metadatas, embeddings, ids),
    )

Search for most similar vectors.

See VectorCollection.similarity_search for full documentation.

Source code in src/simplevecdb/async_core.py
async def similarity_search(
    self,
    query: str | Sequence[float],
    k: int = 5,
    filter: dict[str, Any] | None = None,
    *,
    exact: bool | None = None,
    threads: int = 0,
) -> list[tuple[Document, float]]:
    """
    Search for most similar vectors.

    See VectorCollection.similarity_search for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.similarity_search(
            query, k, filter, exact=exact, threads=threads
        ),
    )

similarity_search_batch(queries, k=5, filter=None, *, exact=None, threads=0) async

Batch search for multiple query vectors.

See VectorCollection.similarity_search_batch for full documentation.

Source code in src/simplevecdb/async_core.py
async def similarity_search_batch(
    self,
    queries: Sequence[Sequence[float]],
    k: int = 5,
    filter: dict[str, Any] | None = None,
    *,
    exact: bool | None = None,
    threads: int = 0,
) -> list[list[tuple[Document, float]]]:
    """
    Batch search for multiple query vectors.

    See VectorCollection.similarity_search_batch for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.similarity_search_batch(
            queries, k, filter, exact=exact, threads=threads
        ),
    )

Search using BM25 keyword ranking.

See VectorCollection.keyword_search for full documentation.

Source code in src/simplevecdb/async_core.py
async def keyword_search(
    self,
    query: str,
    k: int = 5,
    filter: dict[str, Any] | None = None,
) -> list[tuple[Document, float]]:
    """
    Search using BM25 keyword ranking.

    See VectorCollection.keyword_search for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.keyword_search(query, k, filter),
    )

Combine keyword and vector search using Reciprocal Rank Fusion.

See VectorCollection.hybrid_search for full documentation.

Source code in src/simplevecdb/async_core.py
async def hybrid_search(
    self,
    query: str,
    k: int = 5,
    filter: dict[str, Any] | None = None,
    *,
    query_vector: Sequence[float] | None = None,
    vector_k: int | None = None,
    keyword_k: int | None = None,
    rrf_k: int = 60,
) -> list[tuple[Document, float]]:
    """
    Combine keyword and vector search using Reciprocal Rank Fusion.

    See VectorCollection.hybrid_search for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.hybrid_search(
            query,
            k,
            filter,
            query_vector=query_vector,
            vector_k=vector_k,
            keyword_k=keyword_k,
            rrf_k=rrf_k,
        ),
    )

Search with diversity using Max Marginal Relevance.

See VectorCollection.max_marginal_relevance_search for full documentation.

Source code in src/simplevecdb/async_core.py
async def max_marginal_relevance_search(
    self,
    query: str | Sequence[float],
    k: int = 5,
    fetch_k: int = 20,
    lambda_mult: float = 0.5,
    filter: dict[str, Any] | None = None,
) -> list[Document]:
    """
    Search with diversity using Max Marginal Relevance.

    See VectorCollection.max_marginal_relevance_search for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.max_marginal_relevance_search(
            query, k, fetch_k, lambda_mult, filter
        ),
    )

delete_by_ids(ids) async

Delete documents by their IDs.

See VectorCollection.delete_by_ids for full documentation.

Source code in src/simplevecdb/async_core.py
async def delete_by_ids(self, ids: Sequence[int]) -> None:
    """
    Delete documents by their IDs.

    See VectorCollection.delete_by_ids for full documentation.
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        self._executor,
        lambda: self._collection.delete_by_ids(ids),
    )

remove_texts(texts=None, filter=None) async

Remove documents by text content or metadata filter.

See VectorCollection.remove_texts for full documentation.

Source code in src/simplevecdb/async_core.py
async def remove_texts(
    self,
    texts: Sequence[str] | None = None,
    filter: dict[str, Any] | None = None,
) -> int:
    """
    Remove documents by text content or metadata filter.

    See VectorCollection.remove_texts for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.remove_texts(texts, filter),
    )

cluster(n_clusters=None, algorithm='minibatch_kmeans', *, filter=None, sample_size=None, min_cluster_size=5, random_state=None) async

Cluster documents by their embeddings (async).

See VectorCollection.cluster for full documentation.

Source code in src/simplevecdb/async_core.py
async def cluster(
    self,
    n_clusters: int | None = None,
    algorithm: str = "minibatch_kmeans",
    *,
    filter: dict[str, Any] | None = None,
    sample_size: int | None = None,
    min_cluster_size: int = 5,
    random_state: int | None = None,
) -> Any:
    """
    Cluster documents by their embeddings (async).

    See VectorCollection.cluster for full documentation.
    """

    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.cluster(
            n_clusters,
            algorithm,  # type: ignore[arg-type]
            filter=filter,
            sample_size=sample_size,
            min_cluster_size=min_cluster_size,
            random_state=random_state,
        ),
    )

auto_tag(cluster_result, *, method='keywords', n_keywords=5, custom_callback=None) async

Generate descriptive tags for clusters (async).

See VectorCollection.auto_tag for full documentation.

Source code in src/simplevecdb/async_core.py
async def auto_tag(
    self,
    cluster_result: Any,
    *,
    method: str = "keywords",
    n_keywords: int = 5,
    custom_callback: Any = None,
) -> dict[int, str]:
    """
    Generate descriptive tags for clusters (async).

    See VectorCollection.auto_tag for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.auto_tag(
            cluster_result,
            method=method,
            n_keywords=n_keywords,
            custom_callback=custom_callback,
        ),
    )

assign_cluster_metadata(cluster_result, tags=None, *, metadata_key='cluster', tag_key='cluster_tag') async

Persist cluster assignments to metadata (async).

See VectorCollection.assign_cluster_metadata for full documentation.

Source code in src/simplevecdb/async_core.py
async def assign_cluster_metadata(
    self,
    cluster_result: Any,
    tags: dict[int, str] | None = None,
    *,
    metadata_key: str = "cluster",
    tag_key: str = "cluster_tag",
) -> int:
    """
    Persist cluster assignments to metadata (async).

    See VectorCollection.assign_cluster_metadata for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.assign_cluster_metadata(
            cluster_result,
            tags,
            metadata_key=metadata_key,
            tag_key=tag_key,
        ),
    )

get_cluster_members(cluster_id, *, metadata_key='cluster') async

Get all documents in a cluster (async).

See VectorCollection.get_cluster_members for full documentation.

Source code in src/simplevecdb/async_core.py
async def get_cluster_members(
    self,
    cluster_id: int,
    *,
    metadata_key: str = "cluster",
) -> list[Document]:
    """
    Get all documents in a cluster (async).

    See VectorCollection.get_cluster_members for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.get_cluster_members(
            cluster_id, metadata_key=metadata_key
        ),
    )

save_cluster(name, cluster_result, *, metadata=None) async

Save cluster state for later assignment (async).

See VectorCollection.save_cluster for full documentation.

Source code in src/simplevecdb/async_core.py
async def save_cluster(
    self,
    name: str,
    cluster_result: Any,
    *,
    metadata: dict[str, Any] | None = None,
) -> None:
    """
    Save cluster state for later assignment (async).

    See VectorCollection.save_cluster for full documentation.
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        self._executor,
        lambda: self._collection.save_cluster(
            name, cluster_result, metadata=metadata
        ),
    )

load_cluster(name) async

Load saved cluster state (async).

See VectorCollection.load_cluster for full documentation.

Source code in src/simplevecdb/async_core.py
async def load_cluster(
    self,
    name: str,
) -> tuple[Any, dict[str, Any]] | None:
    """
    Load saved cluster state (async).

    See VectorCollection.load_cluster for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.load_cluster(name),
    )

list_clusters() async

List all saved cluster configurations (async).

See VectorCollection.list_clusters for full documentation.

Source code in src/simplevecdb/async_core.py
async def list_clusters(self) -> list[dict[str, Any]]:
    """
    List all saved cluster configurations (async).

    See VectorCollection.list_clusters for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        self._collection.list_clusters,
    )

delete_cluster(name) async

Delete a saved cluster configuration (async).

See VectorCollection.delete_cluster for full documentation.

Source code in src/simplevecdb/async_core.py
async def delete_cluster(self, name: str) -> bool:
    """
    Delete a saved cluster configuration (async).

    See VectorCollection.delete_cluster for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.delete_cluster(name),
    )

assign_to_cluster(name, doc_ids, *, metadata_key='cluster') async

Assign documents to a saved cluster (async).

See VectorCollection.assign_to_cluster for full documentation.

Source code in src/simplevecdb/async_core.py
async def assign_to_cluster(
    self,
    name: str,
    doc_ids: list[int],
    *,
    metadata_key: str = "cluster",
) -> int:
    """
    Assign documents to a saved cluster (async).

    See VectorCollection.assign_to_cluster for full documentation.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        lambda: self._collection.assign_to_cluster(
            name, doc_ids, metadata_key=metadata_key
        ),
    )