simplevecdb.embeddings.models

load_model(repo_id)

Load (and cache on disk) a SentenceTransformer for the given repo id.

Source code in src/simplevecdb/embeddings/models.py
def load_model(repo_id: str) -> SentenceTransformerType:
    """Load (and cache on disk) a SentenceTransformer for the given repo id."""
    CACHE_DIR.mkdir(parents=True, exist_ok=True)

    snapshot = _load_snapshot_download()
    st_cls = _load_sentence_transformer_cls()

    model_path = snapshot(
        repo_id=repo_id,
        cache_dir=CACHE_DIR,
        local_files_only=False,  # auto-download first time
    )

    # Use PyTorch backend by default (most compatible)
    # ONNX backend has compatibility issues with optimum>=2.0
    model = st_cls(
        model_path,
        tokenizer_kwargs={"padding": True, "truncation": True, "max_length": 512},
        backend="torch",
    )

    return model

get_embedder(model_id=None)

Return a cached embedder for the requested model (defaults to config value).

Source code in src/simplevecdb/embeddings/models.py
def get_embedder(model_id: str | None = None) -> SentenceTransformerType:
    """Return a cached embedder for the requested model (defaults to config value)."""
    repo_id = model_id or DEFAULT_MODEL
    with _model_lock:
        model = _loaded_models.get(repo_id)
        if model is None:
            model = load_model(repo_id)
            _loaded_models[repo_id] = model
    return model

embed_texts(texts, *, model_id=None, batch_size=None)

Embed a list of texts using the default model.

Parameters:

Name Type Description Default
texts list[str]

List of strings to embed.

required
model_id str | None

Optional repo id / alias override.

None
batch_size int | None

Optional override for encode batch size.

None

Returns:

Type Description
list[list[float]]

List of embedding vectors (list of floats).

Source code in src/simplevecdb/embeddings/models.py
def embed_texts(
    texts: list[str], *, model_id: str | None = None, batch_size: int | None = None
) -> list[list[float]]:
    """
    Embed a list of texts using the default model.

    Args:
        texts: List of strings to embed.
        model_id: Optional repo id / alias override.
        batch_size: Optional override for encode batch size.

    Returns:
        List of embedding vectors (list of floats).
    """
    if not texts:
        return []

    model = get_embedder(model_id)
    effective_batch_size = batch_size or config.EMBEDDING_BATCH_SIZE
    embeddings = model.encode(
        texts,
        normalize_embeddings=True,
        batch_size=effective_batch_size,
        show_progress_bar=False,
    )
    return embeddings.tolist()

simplevecdb.embeddings.server

RateLimiter

Token bucket rate limiter per IP/identity with TTL cleanup.

Source code in src/simplevecdb/embeddings/server.py
class RateLimiter:
    """Token bucket rate limiter per IP/identity with TTL cleanup."""

    def __init__(
        self,
        requests_per_minute: int = 60,
        burst: int = 10,
        ttl_seconds: int = 3600,
        max_buckets: int = 10000,
    ):
        self._lock = Lock()
        self._buckets: dict[str, dict[str, float]] = {}
        self._rate = requests_per_minute / 60.0  # tokens per second
        self._burst = burst
        self._ttl = ttl_seconds
        self._max_buckets = max_buckets
        self._last_cleanup = time.time()

    def _cleanup_stale(self, now: float) -> None:
        """Remove buckets not accessed within TTL. Called under lock."""
        stale_keys = [
            k for k, v in self._buckets.items() if now - v["last"] > self._ttl
        ]
        for k in stale_keys:
            del self._buckets[k]

    def is_allowed(self, identity: str) -> bool:
        """Check if request is allowed and consume a token."""
        now = time.time()
        with self._lock:
            # Periodic cleanup: every TTL/4 seconds or if bucket count exceeds limit
            if (
                now - self._last_cleanup > self._ttl / 4
                or len(self._buckets) > self._max_buckets
            ):
                self._cleanup_stale(now)
                self._last_cleanup = now

            if identity not in self._buckets:
                self._buckets[identity] = {"tokens": self._burst, "last": now}

            bucket = self._buckets[identity]
            elapsed = now - bucket["last"]
            bucket["tokens"] = min(self._burst, bucket["tokens"] + elapsed * self._rate)
            bucket["last"] = now

            if bucket["tokens"] >= 1:
                bucket["tokens"] -= 1
                return True
            return False

is_allowed(identity)

Check if request is allowed and consume a token.

Source code in src/simplevecdb/embeddings/server.py
def is_allowed(self, identity: str) -> bool:
    """Check if request is allowed and consume a token."""
    now = time.time()
    with self._lock:
        # Periodic cleanup: every TTL/4 seconds or if bucket count exceeds limit
        if (
            now - self._last_cleanup > self._ttl / 4
            or len(self._buckets) > self._max_buckets
        ):
            self._cleanup_stale(now)
            self._last_cleanup = now

        if identity not in self._buckets:
            self._buckets[identity] = {"tokens": self._burst, "last": now}

        bucket = self._buckets[identity]
        elapsed = now - bucket["last"]
        bucket["tokens"] = min(self._burst, bucket["tokens"] + elapsed * self._rate)
        bucket["last"] = now

        if bucket["tokens"] >= 1:
            bucket["tokens"] -= 1
            return True
        return False

ModelRegistry

In-memory mapping of allowed embedding models.

Source code in src/simplevecdb/embeddings/server.py
class ModelRegistry:
    """In-memory mapping of allowed embedding models."""

    def __init__(self, mapping: dict[str, str], allow_unlisted: bool = True):
        self._mapping = mapping or {"default": DEFAULT_MODEL}
        self._default_alias = "default"
        if self._default_alias not in self._mapping:
            self._mapping[self._default_alias] = DEFAULT_MODEL
        self._repo_ids = set(self._mapping.values())
        self._allow_unlisted = allow_unlisted

    def resolve(self, requested: str | None) -> tuple[str, str]:
        """Return (display_id, repo_id) for a requested alias/model name."""
        if not requested:
            return self._default_alias, self._mapping[self._default_alias]
        if requested in self._mapping:
            return requested, self._mapping[requested]
        if requested in self._repo_ids:
            return requested, requested
        if self._allow_unlisted:
            return requested, requested

        allowed = sorted(set(self._mapping.keys()) | self._repo_ids)
        raise HTTPException(
            status_code=400,
            detail={
                "message": f"Model '{requested}' is not allowed.",
                "allowed": allowed,
            },
        )

    def list_models(self) -> list[dict[str, Any]]:
        """Return OpenAI-compatible model listings."""
        models = []
        seen: set[str] = set()
        for alias, repo in self._mapping.items():
            models.append(
                {
                    "id": alias,
                    "object": "model",
                    "created": 0,
                    "owned_by": "simplevecdb",
                    "metadata": {"repo_id": repo},
                }
            )
            seen.add(alias)
        for repo in self._repo_ids:
            if repo in seen:
                continue
            models.append(
                {
                    "id": repo,
                    "object": "model",
                    "created": 0,
                    "owned_by": "simplevecdb",
                    "metadata": {"repo_id": repo},
                }
            )
        return models

resolve(requested)

Return (display_id, repo_id) for a requested alias/model name.

Source code in src/simplevecdb/embeddings/server.py
def resolve(self, requested: str | None) -> tuple[str, str]:
    """Return (display_id, repo_id) for a requested alias/model name."""
    if not requested:
        return self._default_alias, self._mapping[self._default_alias]
    if requested in self._mapping:
        return requested, self._mapping[requested]
    if requested in self._repo_ids:
        return requested, requested
    if self._allow_unlisted:
        return requested, requested

    allowed = sorted(set(self._mapping.keys()) | self._repo_ids)
    raise HTTPException(
        status_code=400,
        detail={
            "message": f"Model '{requested}' is not allowed.",
            "allowed": allowed,
        },
    )

list_models()

Return OpenAI-compatible model listings.

Source code in src/simplevecdb/embeddings/server.py
def list_models(self) -> list[dict[str, Any]]:
    """Return OpenAI-compatible model listings."""
    models = []
    seen: set[str] = set()
    for alias, repo in self._mapping.items():
        models.append(
            {
                "id": alias,
                "object": "model",
                "created": 0,
                "owned_by": "simplevecdb",
                "metadata": {"repo_id": repo},
            }
        )
        seen.add(alias)
    for repo in self._repo_ids:
        if repo in seen:
            continue
        models.append(
            {
                "id": repo,
                "object": "model",
                "created": 0,
                "owned_by": "simplevecdb",
                "metadata": {"repo_id": repo},
            }
        )
    return models

UsageMeter

Minimal in-memory tracker for request usage statistics.

Source code in src/simplevecdb/embeddings/server.py
class UsageMeter:
    """Minimal in-memory tracker for request usage statistics."""

    def __init__(self) -> None:
        self._lock = Lock()
        self._stats: dict[str, dict[str, float]] = defaultdict(
            lambda: {"requests": 0, "prompt_tokens": 0, "last_request_ts": 0.0}
        )

    def record(self, identity: str, prompt_tokens: int) -> None:
        now = time.time()
        with self._lock:
            bucket = self._stats[identity]
            bucket["requests"] += 1
            bucket["prompt_tokens"] += prompt_tokens
            bucket["last_request_ts"] = now

    def snapshot(self, identity: str | None = None) -> dict[str, dict[str, float]]:
        with self._lock:
            if identity:
                data = self._stats.get(
                    identity,
                    {"requests": 0, "prompt_tokens": 0, "last_request_ts": 0.0},
                )
                return {identity: dict(data)}
            return {key: dict(value) for key, value in self._stats.items()}

health_check() async

Health check endpoint.

Source code in src/simplevecdb/embeddings/server.py
@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "ok"}

authenticate_request(credentials=Security(auth_scheme), api_key_header=Header(default=None, alias='X-API-Key'))

Validate API key if auth is enabled; otherwise return anonymous identity.

Source code in src/simplevecdb/embeddings/server.py
def authenticate_request(
    credentials: HTTPAuthorizationCredentials | None = Security(auth_scheme),
    api_key_header: str | None = Header(default=None, alias="X-API-Key"),
) -> str:
    """Validate API key if auth is enabled; otherwise return anonymous identity."""
    allowed_keys = config.EMBEDDING_SERVER_API_KEYS
    if not allowed_keys:
        return "anonymous"

    token = api_key_header or (credentials.credentials if credentials else None)
    if not token:
        raise HTTPException(status_code=401, detail="Missing API key")
    if token not in allowed_keys:
        raise HTTPException(status_code=403, detail="Invalid API key")
    return token

create_embeddings(request, raw_request, api_identity=Depends(authenticate_request)) async

Create embeddings for the input text(s).

Parameters:

Name Type Description Default
request EmbeddingRequest

EmbeddingRequest containing input text and model.

required

Returns:

Type Description
EmbeddingResponse

EmbeddingResponse with vector data.

Source code in src/simplevecdb/embeddings/server.py
@app.post("/v1/embeddings")
async def create_embeddings(
    request: EmbeddingRequest,
    raw_request: Request,
    api_identity: str = Depends(authenticate_request),
) -> EmbeddingResponse:
    """
    Create embeddings for the input text(s).

    Args:
        request: EmbeddingRequest containing input text and model.

    Returns:
        EmbeddingResponse with vector data.
    """
    # Rate limit by IP or API key
    rate_key = (
        api_identity
        if api_identity != "anonymous"
        else (raw_request.client.host if raw_request.client else "unknown")
    )
    if not rate_limiter.is_allowed(rate_key):
        raise HTTPException(
            status_code=429, detail="Rate limit exceeded. Try again later."
        )
    if isinstance(request.input, str):
        texts = [request.input]
    elif isinstance(request.input, list) and all(
        isinstance(i, int) for i in request.input
    ):
        texts = [str(i) for i in request.input]  # token arrays – just stringify
    else:
        texts = [str(item) for item in request.input]

    if len(texts) > config.EMBEDDING_SERVER_MAX_REQUEST_ITEMS:
        raise HTTPException(
            status_code=413,
            detail=(
                "Batch size "
                f"{len(texts)} exceeds EMBEDDING_SERVER_MAX_REQUEST_ITEMS="
                f"{config.EMBEDDING_SERVER_MAX_REQUEST_ITEMS}"
            ),
        )

    resolved_model_name, repo_id = registry.resolve(request.model)

    if not texts:
        embeddings = []
    else:
        try:
            effective_batch = min(
                config.EMBEDDING_BATCH_SIZE,
                config.EMBEDDING_SERVER_MAX_REQUEST_ITEMS,
            )
            embeddings = embed_texts(
                texts, model_id=repo_id, batch_size=effective_batch
            )
        except Exception as e:
            # Log the full error internally but return generic message
            _logger.exception("Embedding failed: %s", e)
            raise HTTPException(
                status_code=500,
                detail="Embedding operation failed. Check server logs for details.",
            )

    # Fake token usage (optional – some tools expect it)
    total_tokens = sum(len(t.split()) for t in texts)
    usage_meter.record(api_identity, total_tokens)

    return EmbeddingResponse(
        data=[
            EmbeddingData(embedding=emb, index=i) for i, emb in enumerate(embeddings)
        ],
        model=resolved_model_name or repo_id,
        usage={"prompt_tokens": total_tokens, "total_tokens": total_tokens},
    )

list_models(api_identity=Depends(authenticate_request)) async

List available embedding models (requires auth when configured).

Source code in src/simplevecdb/embeddings/server.py
@app.get("/v1/models")
async def list_models(
    api_identity: str = Depends(authenticate_request),
) -> dict[str, Any]:
    """List available embedding models (requires auth when configured)."""
    _ = api_identity  # dependency enforces auth when enabled
    return {"data": registry.list_models(), "object": "list"}

usage(api_identity=Depends(authenticate_request)) async

Return aggregate or per-key usage statistics.

Source code in src/simplevecdb/embeddings/server.py
@app.get("/v1/usage")
async def usage(api_identity: str = Depends(authenticate_request)) -> dict[str, Any]:
    """Return aggregate or per-key usage statistics."""
    # If auth is enabled, only return the caller's stats; otherwise expose all.
    scope = api_identity if config.EMBEDDING_SERVER_API_KEYS else None
    return {"object": "usage", "data": usage_meter.snapshot(scope)}

run_server(host=None, port=None)

Run the embedding server.

Can be called programmatically or via the simplevecdb-server CLI.

Examples

Run with default settings: $ simplevecdb-server

Override port: $ simplevecdb-server --port 8000

Parameters:

Name Type Description Default
host str | None

Server host (defaults to config.SERVER_HOST).

None
port int | None

Server port (defaults to config.SERVER_PORT).

None
Source code in src/simplevecdb/embeddings/server.py
def run_server(host: str | None = None, port: int | None = None) -> None:
    """Run the embedding server.

    Can be called programmatically or via the ``simplevecdb-server`` CLI.

    Examples
    --------
    Run with default settings:
    $ simplevecdb-server

    Override port:
    $ simplevecdb-server --port 8000

    Args:
        host: Server host (defaults to config.SERVER_HOST).
        port: Server port (defaults to config.SERVER_PORT).
    """
    # Minimal CLI-style override when invoked as a script/entry point
    # Allows commands like: simplevecdb-server --host 0.0.0.0 --port 8000
    import sys

    argv = sys.argv[1:]
    for i, arg in enumerate(argv):
        if arg in {"--host", "-h"} and i + 1 < len(argv):
            host = argv[i + 1]
        if arg in {"--port", "-p"} and i + 1 < len(argv):
            try:
                port = int(argv[i + 1])
            except ValueError:
                pass

    host = host or config.SERVER_HOST
    port = port or config.SERVER_PORT

    # Security warnings
    if not config.EMBEDDING_SERVER_API_KEYS:
        _logger.warning(
            "⚠️  No API keys configured (EMBEDDING_SERVER_API_KEYS is empty). "
            "Server is running without authentication. "
            "Set EMBEDDING_SERVER_API_KEYS for production use."
        )
    if host == "0.0.0.0":
        _logger.warning(
            "⚠️  Server binding to all interfaces (0.0.0.0). "
            "This exposes the server to the network. "
            "Use 127.0.0.1 for local-only access."
        )

    uvicorn.run(app, host=host, port=port, log_level="info")