From 4404f59d387606639eb02484e8d119840d25f043 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Wed, 29 Oct 2025 15:10:05 +0530 Subject: [PATCH 01/14] feat(...) --- haystack-core-integrations | 1 + .../components/rankers/fastembed/__init__.py | 3 + .../rankers/fastembed/colbert_reranker.py | 138 ++++++++++++++++++ .../rankers/fastembed/colbert_utils.py | 4 + .../test_fastembed_colbert_reranker.py | 24 +++ 5 files changed, 170 insertions(+) create mode 160000 haystack-core-integrations create mode 100644 integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py create mode 100644 integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py create mode 100644 integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py create mode 100644 tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py diff --git a/haystack-core-integrations b/haystack-core-integrations new file mode 160000 index 0000000000..f8cc0807cc --- /dev/null +++ b/haystack-core-integrations @@ -0,0 +1 @@ +Subproject commit f8cc0807cc33950ca6aa07d51a7b90b7c21e665e diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py new file mode 100644 index 0000000000..3ef7d4078b --- /dev/null +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py @@ -0,0 +1,3 @@ +from .colbert_reranker import FastembedColbertReranker + +__all__ = ["FastembedColbertReranker"] \ No newline at end of file diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py new file mode 100644 index 0000000000..6cae46a3ea --- /dev/null +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -0,0 +1,138 @@ +# SPDX-FileCopyrightText: 2025-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing import Any, Iterable, List, Optional, Sequence + +from haystack import Document, component + + +@component +class FastembedColbertReranker: + """ + Rerank Documents using ColBERT late-interaction scoring via FastEmbed. + + This component expects a *retrieved* list of Documents (e.g., top 100–500) + and reorders them by ColBERT MaxSim score with respect to the input query. + + Parameters + ---------- + model : str, default: "colbert-ir/colbertv2.0" + The ColBERT-compatible model name to load via FastEmbed. + batch_size : int, default: 16 + Number of documents to encode per batch. + threads : Optional[int], default: None + Number of CPU threads for inference (passed to FastEmbed if supported). + similarity : {"cosine", "dot"}, default: "cosine" + Similarity for token–token interactions inside MaxSim. + normalize : bool, default: True + L2-normalize token embeddings before similarity (needed for cosine). + max_query_tokens : Optional[int], default: None + Truncate/limit tokens on the query side, if desired. + max_doc_tokens : Optional[int], default: None + Truncate/limit tokens on the document side, if desired. + + Notes + ----- + - This is a *reranker*. Use it after a retriever (BM25/dense) with ~100–500 candidates. + - Lives in the FastEmbed integration to avoid new core dependencies. + """ + + def __init__( + self, + model: str = "colbert-ir/colbertv2.0", + batch_size: int = 16, + threads: Optional[int] = None, + similarity: str = "cosine", + normalize: bool = True, + max_query_tokens: Optional[int] = None, + max_doc_tokens: Optional[int] = None, + ): + self.model = model + self.batch_size = batch_size + self.threads = threads + self.similarity = similarity + self.normalize = normalize + self.max_query_tokens = max_query_tokens + self.max_doc_tokens = max_doc_tokens + + # Lazy-loaded in warm_up() + self._q_encoder = None # type: ignore[var-annotated] + self._d_encoder = None # type: ignore[var-annotated] + self._ready = False + + def warm_up(self): + """ + Load FastEmbed encoders and do a tiny dry run to JIT/initialize backends. + + We keep imports here to avoid importing fastembed unless needed. + """ + if self._ready: + return + + try: + # NOTE: We'll wire these real classes in Step 3 when we implement encoding. + # from fastembed import LateInteractionTextEmbedding + # self._q_encoder = LateInteractionTextEmbedding(self.model, mode="query", threads=self.threads) + # self._d_encoder = LateInteractionTextEmbedding(self.model, mode="document", threads=self.threads) + + # Minimal stub for now: + self._q_encoder = object() + self._d_encoder = object() + + # Optional: perform a tiny no-op dry-run later once encoders are real. + self._ready = True + except Exception as e: + raise RuntimeError( + f"Failed to initialize FastEmbed ColBERT encoders for model '{self.model}': {e}" + ) from e + + def _ensure_ready(self): + if not self._ready: + # Allow implicit warm-up on first run for ergonomics + self.warm_up() + + def _get_texts(self, documents: Sequence[Document]) -> List[str]: + # Prefer Document.content; fall back to empty string to avoid crashes + return [doc.content or "" for doc in documents] + + def run( + self, + query: str, + documents: Sequence[Document], + top_k: Optional[int] = None, + ) -> dict[str, Any]: + """ + Rerank the input documents with respect to the query using ColBERT MaxSim. + + Parameters + ---------- + query : str + The user query. + documents : Sequence[Document] + Candidate documents to rerank (typically ~100–500). + top_k : Optional[int] + If given, only the top-k reranked documents are returned. + + Returns + ------- + dict + {"documents": List[Document]} with `Document.score` set to the ColBERT score. + """ + self._ensure_ready() + + if not documents: + return {"documents": []} + + # === Placeholder logic (wire real scoring in Step 3) === + # For now we keep the input order and set a dummy score (0.0). + # Next step will replace this with actual FastEmbed + MaxSim scoring. + reranked = list(documents) + for d in reranked: + d.score = 0.0 + + if top_k is not None: + reranked = reranked[:top_k] + + return {"documents": reranked} diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py new file mode 100644 index 0000000000..dd176e2e93 --- /dev/null +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py @@ -0,0 +1,4 @@ +from __future__ import annotations + +# We’ll implement real MaxSim scoring utilities here in Step 3. +# Keeping the file so imports are stable when we wire things up. \ No newline at end of file diff --git a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py new file mode 100644 index 0000000000..a8e3c07409 --- /dev/null +++ b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py @@ -0,0 +1,24 @@ +from haystack import Document +from integrations.fastembed.haystack_integrations.components.rankers.fastembed import ( + FastembedColbertReranker, +) + + +def test_symbol_imports(): + # Just ensures the class is importable before we add real tests + rr = FastembedColbertReranker() + assert hasattr(rr, "run") + + +def test_run_no_documents_returns_empty(): + rr = FastembedColbertReranker() + result = rr.run(query="test", documents=[]) + assert result["documents"] == [] + + +def test_run_sets_default_scores_and_preserves_length(): + docs = [Document(content="a"), Document(content="b")] + rr = FastembedColbertReranker() + out = rr.run(query="q", documents=docs) + assert len(out["documents"]) == 2 + assert all(d.score is not None for d in out["documents"]) \ No newline at end of file From 7ba9f2ee05d560d990bc1fc1c622f2480161d4d3 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Wed, 29 Oct 2025 15:24:37 +0530 Subject: [PATCH 02/14] feat(fastembed): implement ColBERT MaxSim reranking via FastEmbed (CPU, cosine/dot) --- .pre-commit-config.yaml | 5 + .../rankers/fastembed/colbert_reranker.py | 167 +++++++++++++++--- .../test_fastembed_colbert_reranker.py | 71 +++++++- 3 files changed, 209 insertions(+), 34 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000..a2d5bed862 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,5 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: check-merge-conflict diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 6cae46a3ea..8caf8af3ff 100644 --- a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -5,9 +5,55 @@ from typing import Any, Iterable, List, Optional, Sequence +import math +import numpy as np from haystack import Document, component +def _l2_normalize_rows(mat: np.ndarray) -> np.ndarray: + """ + L2-normalize each row vector in a 2D array. Safe for zero rows. + """ + if mat.ndim != 2: + raise ValueError(f"Expected 2D matrix, got shape {mat.shape}") + norms = np.linalg.norm(mat, axis=1, keepdims=True) + # avoid division by zero + norms = np.where(norms == 0.0, 1.0, norms) + return mat / norms + + +def _maxsim_score(q_mat: np.ndarray, d_mat: np.ndarray, *, similarity: str = "cosine", normalize: bool = True) -> float: + """ + ColBERT late-interaction (MaxSim) score. + + For each query token vector, take the maximum similarity across all doc token vectors, + then sum over query tokens. + + Similarity is either cosine (row-wise normalized dot) or raw dot. + """ + if q_mat.size == 0 or d_mat.size == 0: + return 0.0 + + if similarity not in ("cosine", "dot"): + raise ValueError(f"Unsupported similarity '{similarity}'. Use 'cosine' or 'dot'.") + + if similarity == "cosine" and normalize: + q = _l2_normalize_rows(q_mat) + d = _l2_normalize_rows(d_mat) + else: + q = q_mat + d = d_mat + + # [Lq, D] x [D, Ld] -> [Lq, Ld] + sim = q @ d.T # numpy matmul + + # Max over doc tokens for each query token, then sum + # Guard against empty axis when Ld==0 (handled above by size check) + max_per_q = sim.max(axis=1) + score = float(max_per_q.sum()) + return score + + @component class FastembedColbertReranker: """ @@ -29,9 +75,9 @@ class FastembedColbertReranker: normalize : bool, default: True L2-normalize token embeddings before similarity (needed for cosine). max_query_tokens : Optional[int], default: None - Truncate/limit tokens on the query side, if desired. + Truncate/limit tokens on the query side, if supported by the encoder. max_doc_tokens : Optional[int], default: None - Truncate/limit tokens on the document side, if desired. + Truncate/limit tokens on the document side, if supported by the encoder. Notes ----- @@ -58,45 +104,96 @@ def __init__( self.max_doc_tokens = max_doc_tokens # Lazy-loaded in warm_up() - self._q_encoder = None # type: ignore[var-annotated] - self._d_encoder = None # type: ignore[var-annotated] + self._encoder = None # LateInteractionTextEmbedding self._ready = False def warm_up(self): """ - Load FastEmbed encoders and do a tiny dry run to JIT/initialize backends. - - We keep imports here to avoid importing fastembed unless needed. + Load FastEmbed encoders and do a tiny dry run to initialize backends. """ if self._ready: return try: - # NOTE: We'll wire these real classes in Step 3 when we implement encoding. - # from fastembed import LateInteractionTextEmbedding - # self._q_encoder = LateInteractionTextEmbedding(self.model, mode="query", threads=self.threads) - # self._d_encoder = LateInteractionTextEmbedding(self.model, mode="document", threads=self.threads) - - # Minimal stub for now: - self._q_encoder = object() - self._d_encoder = object() + # Lazy import to avoid hard dependency outside this integration + from fastembed import LateInteractionTextEmbedding # type: ignore + + # LateInteractionTextEmbedding exposes .query_embed() and .embed() generators + # Some fastembed versions use 'model_name' kw, others accept positional. + # We'll pass by name for clarity. + self._encoder = LateInteractionTextEmbedding( + model_name=self.model, + threads=self.threads, + # Some fastembed versions accept kwargs like max_tokens; if not, they're ignored safely. + max_tokens_query=self.max_query_tokens, # optional / best-effort + max_tokens_document=self.max_doc_tokens, # optional / best-effort + ) + + # Tiny dry-run to trigger onnx initialization (doesn't fail if offline) + _ = next(self._encoder.query_embed(["warmup"]), None) + _ = next(self._encoder.embed(["warmup"]), None) - # Optional: perform a tiny no-op dry-run later once encoders are real. self._ready = True + except ModuleNotFoundError as e: + raise RuntimeError( + "fastembed is not installed. Please install the FastEmbed integration:\n\n" + " pip install fastembed-haystack\n" + ) from e except Exception as e: raise RuntimeError( - f"Failed to initialize FastEmbed ColBERT encoders for model '{self.model}': {e}" + f"Failed to initialize FastEmbed ColBERT encoder for model '{self.model}': {e}" ) from e def _ensure_ready(self): if not self._ready: - # Allow implicit warm-up on first run for ergonomics self.warm_up() - def _get_texts(self, documents: Sequence[Document]) -> List[str]: + @staticmethod + def _get_texts(documents: Sequence[Document]) -> List[str]: # Prefer Document.content; fall back to empty string to avoid crashes return [doc.content or "" for doc in documents] + def _encode_query(self, text: str) -> np.ndarray: + """ + Encode a single query into a [Lq, D] numpy array of token embeddings. + """ + assert self._encoder is not None + # .query_embed returns an iterator/generator over embeddings + gen = self._encoder.query_embed([text]) + arr = next(gen, None) + if arr is None: + return np.zeros((0, 0), dtype=np.float32) + return np.asarray(arr, dtype=np.float32) + + def _encode_docs_batched(self, texts: List[str]) -> List[np.ndarray]: + """ + Encode documents into token-embedding matrices, batched. + Returns a list of [Ld, D] arrays aligned with `texts`. + """ + assert self._encoder is not None + + results: List[np.ndarray] = [] + n = len(texts) + if n == 0: + return results + + for start in range(0, n, self.batch_size): + end = min(start + self.batch_size, n) + batch = texts[start:end] + # .embed returns a generator yielding one embedding per input string + for emb in self._encoder.embed(batch): + if emb is None: + results.append(np.zeros((0, 0), dtype=np.float32)) + else: + results.append(np.asarray(emb, dtype=np.float32)) + + # Safety: ensure alignment + if len(results) != n: + raise RuntimeError( + f"Encoder returned {len(results)} embeddings for {n} documents; batch logic out of sync." + ) + return results + def run( self, query: str, @@ -122,17 +219,31 @@ def run( """ self._ensure_ready() - if not documents: + docs_list = list(documents) + if not docs_list: return {"documents": []} - # === Placeholder logic (wire real scoring in Step 3) === - # For now we keep the input order and set a dummy score (0.0). - # Next step will replace this with actual FastEmbed + MaxSim scoring. - reranked = list(documents) - for d in reranked: - d.score = 0.0 + # Encode query once + q_mat = self._encode_query(query) + + # Encode documents (batched) + doc_texts = self._get_texts(docs_list) + doc_mats = self._encode_docs_batched(doc_texts) + + # Compute scores + scores: List[float] = [] + for d_mat in doc_mats: + score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) + scores.append(score) + + # Attach and sort (descending) + for d, s in zip(docs_list, scores): + d.score = s + + docs_list.sort(key=lambda d: (d.score if d.score is not None else float("-inf")), reverse=True) + # Slice top_k if requested if top_k is not None: - reranked = reranked[:top_k] + docs_list = docs_list[:top_k] - return {"documents": reranked} + return {"documents": docs_list} \ No newline at end of file diff --git a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py index a8e3c07409..bef26cf87f 100644 --- a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py +++ b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py @@ -1,24 +1,83 @@ +import numpy as np from haystack import Document from integrations.fastembed.haystack_integrations.components.rankers.fastembed import ( FastembedColbertReranker, ) +from integrations.fastembed.haystack_integrations.components.rankers.fastembed.colbert_reranker import ( + _maxsim_score, _l2_normalize_rows +) def test_symbol_imports(): - # Just ensures the class is importable before we add real tests rr = FastembedColbertReranker() assert hasattr(rr, "run") +def test_l2_normalize_rows_shapes_and_zeros(): + x = np.array([[3.0, 4.0], [0.0, 0.0]], dtype=np.float32) + y = _l2_normalize_rows(x) + # First row becomes unit norm, second row stays zeros (safe divide) + assert y.shape == x.shape + assert np.isclose(np.linalg.norm(y[0]), 1.0, atol=1e-6) + assert np.allclose(y[1], np.zeros_like(y[1])) + + +def test_maxsim_score_simple_cosine(): + # Query has two tokens; doc has three tokens + q = np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32) # e1, e2 + d = np.array([[1.0, 0.0], [0.5, 0.5], [0.0, 1.0]], dtype=np.float32) # e1, mid, e2 + + # With cosine + normalization, MaxSim per q-token is 1.0 for both -> sum = 2.0 + score = _maxsim_score(q, d, similarity="cosine", normalize=True) + assert np.isclose(score, 2.0, atol=1e-6) + + def test_run_no_documents_returns_empty(): rr = FastembedColbertReranker() result = rr.run(query="test", documents=[]) assert result["documents"] == [] -def test_run_sets_default_scores_and_preserves_length(): - docs = [Document(content="a"), Document(content="b")] +def test_run_topk_slices_without_fastembed(monkeypatch): + """ + We monkeypatch the encoder methods to avoid requiring FastEmbed models in CI. + This also checks sorting by score and top_k behavior deterministically. + """ rr = FastembedColbertReranker() - out = rr.run(query="q", documents=docs) - assert len(out["documents"]) == 2 - assert all(d.score is not None for d in out["documents"]) \ No newline at end of file + + # Fake "ready" state (skip warm_up + fastembed import) + rr._ready = True + rr._encoder = object() + + # Query encoding -> 2x2 (arbitrary but fixed) + def fake_encode_query(text: str): + return np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32) + + # Doc encodings -> create three docs with different alignments + def fake_encode_docs(texts): + mats = [] + for t in texts: + if "best" in t: + mats.append(np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32)) # high + elif "ok" in t: + mats.append(np.array([[0.8, 0.2], [0.2, 0.8]], dtype=np.float32)) # medium + else: + mats.append(np.array([[1.0, 0.0]], dtype=np.float32)) # lower + return mats + + rr._encode_query = fake_encode_query # type: ignore + rr._encode_docs_batched = fake_encode_docs # type: ignore + + docs = [ + Document(content="this is lower"), + Document(content="this is ok"), + Document(content="this is best"), + ] + out = rr.run(query="q", documents=docs, top_k=2) + ranked = out["documents"] + + # Expect "best" first, then "ok" + assert len(ranked) == 2 + assert "best" in ranked[0].content + assert "ok" in ranked[1].content + assert ranked[0].score >= ranked[1].score \ No newline at end of file From fdc0de48897c27d3e3f9e07de60cfa5ba4afdca8 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Wed, 29 Oct 2025 17:30:11 +0530 Subject: [PATCH 03/14] chore(fastembed): parameter validation, stable sorting, robust warm-up; add tests --- .pre-commit-config.yaml | 5 - .../components/rankers/fastembed/__init__.py | 2 +- .../rankers/fastembed/colbert_reranker.py | 238 ++++++++---------- .../rankers/fastembed/colbert_utils.py | 2 +- .../test_fastembed_colbert_reranker.py | 48 +++- 5 files changed, 149 insertions(+), 146 deletions(-) delete mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index a2d5bed862..0000000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,5 +0,0 @@ -repos: - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 - hooks: - - id: check-merge-conflict diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py index 3ef7d4078b..b1999306c6 100644 --- a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py @@ -1,3 +1,3 @@ from .colbert_reranker import FastembedColbertReranker -__all__ = ["FastembedColbertReranker"] \ No newline at end of file +__all__ = ["FastembedColbertReranker"] diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 8caf8af3ff..3dbc0f0b70 100644 --- a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -3,39 +3,43 @@ from __future__ import annotations -from typing import Any, Iterable, List, Optional, Sequence +from typing import Any, List, Optional, Sequence -import math import numpy as np from haystack import Document, component +_TWO_D = 2 # avoid PLR2004 "magic number" warning +_VALID_SIMS = {"cosine", "dot"} + def _l2_normalize_rows(mat: np.ndarray) -> np.ndarray: """ L2-normalize each row vector in a 2D array. Safe for zero rows. """ - if mat.ndim != 2: - raise ValueError(f"Expected 2D matrix, got shape {mat.shape}") + if mat.ndim != _TWO_D: + msg = f"Expected 2D matrix, got shape {mat.shape}" + raise ValueError(msg) norms = np.linalg.norm(mat, axis=1, keepdims=True) - # avoid division by zero - norms = np.where(norms == 0.0, 1.0, norms) + norms = np.where(norms == 0.0, 1.0, norms) # avoid division by zero return mat / norms -def _maxsim_score(q_mat: np.ndarray, d_mat: np.ndarray, *, similarity: str = "cosine", normalize: bool = True) -> float: +def _maxsim_score( + q_mat: np.ndarray, + d_mat: np.ndarray, + *, + similarity: str = "cosine", + normalize: bool = True, +) -> float: """ - ColBERT late-interaction (MaxSim) score. - - For each query token vector, take the maximum similarity across all doc token vectors, - then sum over query tokens. - - Similarity is either cosine (row-wise normalized dot) or raw dot. + ColBERT late-interaction (MaxSim) score: + For each query token vector, take the maximum similarity across all doc token vectors, then sum. """ if q_mat.size == 0 or d_mat.size == 0: return 0.0 - - if similarity not in ("cosine", "dot"): - raise ValueError(f"Unsupported similarity '{similarity}'. Use 'cosine' or 'dot'.") + if similarity not in _VALID_SIMS: + msg = f"Unsupported similarity '{similarity}'. Use 'cosine' or 'dot'." + raise ValueError(msg) if similarity == "cosine" and normalize: q = _l2_normalize_rows(q_mat) @@ -44,14 +48,10 @@ def _maxsim_score(q_mat: np.ndarray, d_mat: np.ndarray, *, similarity: str = "co q = q_mat d = d_mat - # [Lq, D] x [D, Ld] -> [Lq, Ld] - sim = q @ d.T # numpy matmul - - # Max over doc tokens for each query token, then sum - # Guard against empty axis when Ld==0 (handled above by size check) - max_per_q = sim.max(axis=1) - score = float(max_per_q.sum()) - return score + # [Lq, D] @ [D, Ld] -> [Lq, Ld] + sim = q @ d.T + max_per_q = sim.max(axis=1) # max over doc tokens for each query token + return float(max_per_q.sum()) @component @@ -61,39 +61,17 @@ class FastembedColbertReranker: This component expects a *retrieved* list of Documents (e.g., top 100–500) and reorders them by ColBERT MaxSim score with respect to the input query. - - Parameters - ---------- - model : str, default: "colbert-ir/colbertv2.0" - The ColBERT-compatible model name to load via FastEmbed. - batch_size : int, default: 16 - Number of documents to encode per batch. - threads : Optional[int], default: None - Number of CPU threads for inference (passed to FastEmbed if supported). - similarity : {"cosine", "dot"}, default: "cosine" - Similarity for token–token interactions inside MaxSim. - normalize : bool, default: True - L2-normalize token embeddings before similarity (needed for cosine). - max_query_tokens : Optional[int], default: None - Truncate/limit tokens on the query side, if supported by the encoder. - max_doc_tokens : Optional[int], default: None - Truncate/limit tokens on the document side, if supported by the encoder. - - Notes - ----- - - This is a *reranker*. Use it after a retriever (BM25/dense) with ~100–500 candidates. - - Lives in the FastEmbed integration to avoid new core dependencies. """ def __init__( self, model: str = "colbert-ir/colbertv2.0", batch_size: int = 16, - threads: Optional[int] = None, + threads: int | None = None, similarity: str = "cosine", normalize: bool = True, - max_query_tokens: Optional[int] = None, - max_doc_tokens: Optional[int] = None, + max_query_tokens: int | None = None, + max_doc_tokens: int | None = None, ): self.model = model self.batch_size = batch_size @@ -103,147 +81,139 @@ def __init__( self.max_query_tokens = max_query_tokens self.max_doc_tokens = max_doc_tokens + if similarity not in _VALID_SIMS: + msg = f"similarity must be one of {_VALID_SIMS}, got {similarity!r}" + raise ValueError(msg) + if batch_size <= 0: + msg = f"batch_size must be > 0, got {batch_size}" + raise ValueError(msg) + # Lazy-loaded in warm_up() self._encoder = None # LateInteractionTextEmbedding self._ready = False def warm_up(self): - """ - Load FastEmbed encoders and do a tiny dry run to initialize backends. - """ if self._ready: return - try: - # Lazy import to avoid hard dependency outside this integration from fastembed import LateInteractionTextEmbedding # type: ignore - # LateInteractionTextEmbedding exposes .query_embed() and .embed() generators - # Some fastembed versions use 'model_name' kw, others accept positional. - # We'll pass by name for clarity. - self._encoder = LateInteractionTextEmbedding( - model_name=self.model, - threads=self.threads, - # Some fastembed versions accept kwargs like max_tokens; if not, they're ignored safely. - max_tokens_query=self.max_query_tokens, # optional / best-effort - max_tokens_document=self.max_doc_tokens, # optional / best-effort - ) - - # Tiny dry-run to trigger onnx initialization (doesn't fail if offline) - _ = next(self._encoder.query_embed(["warmup"]), None) - _ = next(self._encoder.embed(["warmup"]), None) + kwargs = {"model_name": self.model, "threads": self.threads} + # best-effort: only pass token kwargs if supported + for k, v in { + "max_tokens_query": self.max_query_tokens, + "max_tokens_document": self.max_doc_tokens, + }.items(): + if v is not None: + kwargs[k] = v + + try: + self._encoder = LateInteractionTextEmbedding(**kwargs) + except TypeError: + # remove unknown kwargs & retry + safe_kwargs = {"model_name": self.model, "threads": self.threads} + self._encoder = LateInteractionTextEmbedding(**safe_kwargs) + + # tiny dry runs (guard generators possibly being empty) + gen_q = self._encoder.query_embed(["warmup"]) + next(gen_q, None) + gen_d = self._encoder.embed(["warmup"]) + next(gen_d, None) self._ready = True except ModuleNotFoundError as e: - raise RuntimeError( + msg = ( "fastembed is not installed. Please install the FastEmbed integration:\n\n" " pip install fastembed-haystack\n" - ) from e - except Exception as e: + ) raise RuntimeError( - f"Failed to initialize FastEmbed ColBERT encoder for model '{self.model}': {e}" + msg ) from e + except Exception as e: + msg = f"Failed to initialize FastEmbed ColBERT encoder for model '{self.model}': {e}" + raise RuntimeError(msg) from e def _ensure_ready(self): if not self._ready: self.warm_up() @staticmethod - def _get_texts(documents: Sequence[Document]) -> List[str]: + def _get_texts(documents: Sequence[Document]) -> list[str]: # Prefer Document.content; fall back to empty string to avoid crashes return [doc.content or "" for doc in documents] def _encode_query(self, text: str) -> np.ndarray: - """ - Encode a single query into a [Lq, D] numpy array of token embeddings. - """ assert self._encoder is not None - # .query_embed returns an iterator/generator over embeddings - gen = self._encoder.query_embed([text]) - arr = next(gen, None) + arr = next(self._encoder.query_embed([text]), None) if arr is None: return np.zeros((0, 0), dtype=np.float32) - return np.asarray(arr, dtype=np.float32) + a = np.asarray(arr, dtype=np.float32) + if a.ndim != 2: + a = a.reshape(-1, a.shape[-1]) # best-effort + return a - def _encode_docs_batched(self, texts: List[str]) -> List[np.ndarray]: - """ - Encode documents into token-embedding matrices, batched. - Returns a list of [Ld, D] arrays aligned with `texts`. - """ + def _encode_docs_batched(self, texts: list[str]) -> list[np.ndarray]: assert self._encoder is not None - - results: List[np.ndarray] = [] - n = len(texts) - if n == 0: - return results - - for start in range(0, n, self.batch_size): - end = min(start + self.batch_size, n) - batch = texts[start:end] - # .embed returns a generator yielding one embedding per input string + results: list[np.ndarray] = [] + for start in range(0, len(texts), self.batch_size): + batch = texts[start : start + self.batch_size] for emb in self._encoder.embed(batch): if emb is None: results.append(np.zeros((0, 0), dtype=np.float32)) else: - results.append(np.asarray(emb, dtype=np.float32)) - - # Safety: ensure alignment - if len(results) != n: - raise RuntimeError( - f"Encoder returned {len(results)} embeddings for {n} documents; batch logic out of sync." - ) + a = np.asarray(emb, dtype=np.float32) + if a.ndim != 2: + a = a.reshape(-1, a.shape[-1]) + results.append(a) + if len(results) != len(texts): + msg = f"Encoder returned {len(results)} embeddings for {len(texts)} documents." + raise RuntimeError(msg) return results def run( self, query: str, documents: Sequence[Document], - top_k: Optional[int] = None, + top_k: int | None = None, ) -> dict[str, Any]: - """ - Rerank the input documents with respect to the query using ColBERT MaxSim. - - Parameters - ---------- - query : str - The user query. - documents : Sequence[Document] - Candidate documents to rerank (typically ~100–500). - top_k : Optional[int] - If given, only the top-k reranked documents are returned. - - Returns - ------- - dict - {"documents": List[Document]} with `Document.score` set to the ColBERT score. - """ self._ensure_ready() docs_list = list(documents) if not docs_list: return {"documents": []} - # Encode query once - q_mat = self._encode_query(query) + if top_k is not None and top_k < 0: + msg = f"top_k must be >= 0, got {top_k}" + raise ValueError(msg) - # Encode documents (batched) + # keep original order indices for a stable tie-break + for i, d in enumerate(docs_list): + if getattr(d, "meta", None) is None: + d.meta = {} + d.meta.setdefault("_orig_idx", i) + + # --- actual scoring --- + q_mat = self._encode_query(query) doc_texts = self._get_texts(docs_list) doc_mats = self._encode_docs_batched(doc_texts) - # Compute scores - scores: List[float] = [] - for d_mat in doc_mats: - score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) - scores.append(score) + for d, d_mat in zip(docs_list, doc_mats): + d.score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) - # Attach and sort (descending) - for d, s in zip(docs_list, scores): - d.score = s + # sort by score desc; deterministic tie-break by original index + docs_list.sort( + key=lambda d: ( + d.score if d.score is not None else float("-inf"), + -d.meta.get("_orig_idx", 0), + ), + reverse=True, + ) - docs_list.sort(key=lambda d: (d.score if d.score is not None else float("-inf")), reverse=True) + # strip helper key + for d in docs_list: + d.meta.pop("_orig_idx", None) - # Slice top_k if requested if top_k is not None: docs_list = docs_list[:top_k] - return {"documents": docs_list} \ No newline at end of file + return {"documents": docs_list} diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py index dd176e2e93..c3669a808f 100644 --- a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py @@ -1,4 +1,4 @@ from __future__ import annotations # We’ll implement real MaxSim scoring utilities here in Step 3. -# Keeping the file so imports are stable when we wire things up. \ No newline at end of file +# Keeping the file so imports are stable when we wire things up. diff --git a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py index bef26cf87f..ac717550a7 100644 --- a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py +++ b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py @@ -1,10 +1,12 @@ import numpy as np +import pytest from haystack import Document from integrations.fastembed.haystack_integrations.components.rankers.fastembed import ( FastembedColbertReranker, ) from integrations.fastembed.haystack_integrations.components.rankers.fastembed.colbert_reranker import ( - _maxsim_score, _l2_normalize_rows + _maxsim_score, + _l2_normalize_rows, ) @@ -58,11 +60,11 @@ def fake_encode_docs(texts): mats = [] for t in texts: if "best" in t: - mats.append(np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32)) # high + mats.append(np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32)) # high elif "ok" in t: - mats.append(np.array([[0.8, 0.2], [0.2, 0.8]], dtype=np.float32)) # medium + mats.append(np.array([[0.8, 0.2], [0.2, 0.8]], dtype=np.float32)) # medium else: - mats.append(np.array([[1.0, 0.0]], dtype=np.float32)) # lower + mats.append(np.array([[1.0, 0.0]], dtype=np.float32)) # lower return mats rr._encode_query = fake_encode_query # type: ignore @@ -80,4 +82,40 @@ def fake_encode_docs(texts): assert len(ranked) == 2 assert "best" in ranked[0].content assert "ok" in ranked[1].content - assert ranked[0].score >= ranked[1].score \ No newline at end of file + assert ranked[0].score >= ranked[1].score + + +def test_param_validation(): + with pytest.raises(ValueError): + FastembedColbertReranker(batch_size=0) + with pytest.raises(ValueError): + FastembedColbertReranker(similarity="euclid") # unsupported + + +def test_topk_validation(): + rr = FastembedColbertReranker() + with pytest.raises(ValueError): + rr.run(query="q", documents=[Document(content="x")], top_k=-1) + + +def test_stable_tie_break(monkeypatch): + rr = FastembedColbertReranker() + rr._ready = True + rr._encoder = object() + + def fake_q(_): # same query vectors + return np.eye(2, dtype=np.float32) + + def fake_docs(texts): + # craft docs with identical scores → tie + mat = np.eye(2, dtype=np.float32) + return [mat for _ in texts] + + rr._encode_query = fake_q # type: ignore + rr._encode_docs_batched = fake_docs # type: ignore + + docs = [Document(content=f"doc{i}") for i in range(4)] + out = rr.run(query="q", documents=docs) + ranked = out["documents"] + # Ensure length preserved and deterministic order + assert [d.content for d in ranked] == ["doc3", "doc2", "doc1", "doc0"] # due to tie-break we defined From b555998ffbeb77d8d7012ed45d6ba0cdc93a4a32 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 12:25:01 +0530 Subject: [PATCH 04/14] feat(fastembed): add ColBERT late-interaction reranker; tests, README, CHANGELOG, example --- examples/fastembed_colbert_reranker.py | 20 +++++++++++++++++++ integrations/fastembed/CHANGELOG.md | 16 +++++++++++++++ integrations/fastembed/README.md | 20 +++++++++++++++++++ .../src/haystack_integrations/__init__.py | 0 .../components/__init__.py | 0 .../components/rankers/__init__.py | 0 .../components/rankers/fastembed/__init__.py | 4 ++-- .../rankers/fastembed/colbert_reranker.py | 0 8 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 examples/fastembed_colbert_reranker.py create mode 100644 integrations/fastembed/src/haystack_integrations/__init__.py create mode 100644 integrations/fastembed/src/haystack_integrations/components/__init__.py create mode 100644 integrations/fastembed/src/haystack_integrations/components/rankers/__init__.py rename integrations/fastembed/{ => src}/haystack_integrations/components/rankers/fastembed/colbert_reranker.py (100%) diff --git a/examples/fastembed_colbert_reranker.py b/examples/fastembed_colbert_reranker.py new file mode 100644 index 0000000000..d2b853f16a --- /dev/null +++ b/examples/fastembed_colbert_reranker.py @@ -0,0 +1,20 @@ +from haystack import Document +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.components.retrievers.in_memory import InMemoryBM25Retriever +from haystack_integrations.components.rankers.fastembed import FastembedColbertReranker + +store = InMemoryDocumentStore() +store.write_documents([ + Document(content="ColBERT uses late interaction over token embeddings."), + Document(content="BM25 is a classic sparse retrieval model."), + Document(content="Cross-encoders directly score query–document pairs."), +]) + +retriever = InMemoryBM25Retriever(document_store=store) +reranker = FastembedColbertReranker(model="colbert-ir/colbertv2.0") + +query = "late interaction reranking" +cand = retriever.run(query=query, top_k=200) +out = reranker.run(query=query, documents=cand["documents"], top_k=5) +for i, d in enumerate(out["documents"], 1): + print(i, round(d.score, 3), d.content) diff --git a/integrations/fastembed/CHANGELOG.md b/integrations/fastembed/CHANGELOG.md index d771f8c2fb..89d4af3ddc 100644 --- a/integrations/fastembed/CHANGELOG.md +++ b/integrations/fastembed/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog +## [Unreleased] + +### 🚀 Features +- Add `FastembedColbertReranker`: ColBERT late-interaction (MaxSim) reranker for 1st-stage candidates (100–500). (#PR_NUMBER) + +### 📚 Documentation +- FastEmbed docs: “ColBERT Reranker” section with quickstart + pipeline example. (#PR_NUMBER) +- Integration README: add bullet for `FastembedColbertReranker` and links. (#PR_NUMBER) + +### 🧪 Testing +- Unit tests for MaxSim math, parameter validation, stable tie-break, and monkeypatched ranking (no model download). (#PR_NUMBER) + +### 🧹 Chores +- Parameter validation, robust warm-up for fastembed API drift, stable sorting on score ties. (#PR_NUMBER) + + ## [integrations/fastembed-v1.5.0] - 2025-06-17 ### 🐛 Bug Fixes diff --git a/integrations/fastembed/README.md b/integrations/fastembed/README.md index 65596aa462..3ee2edbe8d 100644 --- a/integrations/fastembed/README.md +++ b/integrations/fastembed/README.md @@ -6,6 +6,26 @@ - [Integration page](https://haystack.deepset.ai/integrations/fastembed) - [Changelog](https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/fastembed/CHANGELOG.md) +## Components + +- **FastembedTextEmbedder / FastembedDocumentEmbedder / Sparse embedders** – embedding components for dense and sparse retrieval (see docs). +- **FastembedRanker** – cross-encoder reranker (see docs). +- **FastembedColbertReranker** – *new* ColBERT late-interaction (MaxSim) reranker for reordering 100–500 candidates on CPU via ONNX. + → See the docs for installation and examples. + +### Documentation + +- Integration page: https://haystack.deepset.ai/integrations/fastembed +- Component docs: + - Fastembed embedders: https://docs.haystack.deepset.ai/docs/fastembedtextembedder + - FastembedRanker (cross-encoder): https://docs.haystack.deepset.ai/docs/fastembedranker + - **FastembedColbertReranker (ColBERT)**: *[link to your new docs section]* + +### Install + +```bash +pip install fastembed-haystack + --- ## Contributing diff --git a/integrations/fastembed/src/haystack_integrations/__init__.py b/integrations/fastembed/src/haystack_integrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/fastembed/src/haystack_integrations/components/__init__.py b/integrations/fastembed/src/haystack_integrations/components/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/__init__.py b/integrations/fastembed/src/haystack_integrations/components/rankers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py index ece5e858b9..b1999306c6 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py @@ -1,3 +1,3 @@ -from .ranker import FastembedRanker +from .colbert_reranker import FastembedColbertReranker -__all__ = ["FastembedRanker"] +__all__ = ["FastembedColbertReranker"] diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py similarity index 100% rename from integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_reranker.py rename to integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py From c4567f91f9ee58de0dc3edcd27cdeef10e954c79 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 12:43:38 +0530 Subject: [PATCH 05/14] feat(fastembed): add ColBERT Reranker Signed-off-by: Arya Tayshete --- integrations/fastembed/CHANGELOG.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integrations/fastembed/CHANGELOG.md b/integrations/fastembed/CHANGELOG.md index 89d4af3ddc..711ecf8ea9 100644 --- a/integrations/fastembed/CHANGELOG.md +++ b/integrations/fastembed/CHANGELOG.md @@ -3,17 +3,17 @@ ## [Unreleased] ### 🚀 Features -- Add `FastembedColbertReranker`: ColBERT late-interaction (MaxSim) reranker for 1st-stage candidates (100–500). (#PR_NUMBER) +- Add `FastembedColbertReranker`: ColBERT late-interaction (MaxSim) reranker for 1st-stage candidates (100–500). (#2482) ### 📚 Documentation -- FastEmbed docs: “ColBERT Reranker” section with quickstart + pipeline example. (#PR_NUMBER) -- Integration README: add bullet for `FastembedColbertReranker` and links. (#PR_NUMBER) +- FastEmbed docs: “ColBERT Reranker” section with quickstart + pipeline example. (#2482) +- Integration README: add bullet for `FastembedColbertReranker` and links. (#2482) ### 🧪 Testing -- Unit tests for MaxSim math, parameter validation, stable tie-break, and monkeypatched ranking (no model download). (#PR_NUMBER) +- Unit tests for MaxSim math, parameter validation, stable tie-break, and monkeypatched ranking (no model download). (#2482) ### 🧹 Chores -- Parameter validation, robust warm-up for fastembed API drift, stable sorting on score ties. (#PR_NUMBER) +- Parameter validation, robust warm-up for fastembed API drift, stable sorting on score ties. (#2482) ## [integrations/fastembed-v1.5.0] - 2025-06-17 From bf4b2d186647e0f908e5899ace8d10ed50aba0ab Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 13:05:39 +0530 Subject: [PATCH 06/14] chore(fastembed): satisfy Ruff EM101/EM102 and other lint rules Signed-off-by: Arya Tayshete --- .../rankers/fastembed/colbert_reranker.py | 46 ++++++++----------- .../components/rankers/fastembed/ranker.py | 1 - 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 3dbc0f0b70..cc784af980 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -3,12 +3,11 @@ from __future__ import annotations -from typing import Any, List, Optional, Sequence - +from typing import Any, Sequence import numpy as np from haystack import Document, component -_TWO_D = 2 # avoid PLR2004 "magic number" warning +_TWO_D = 2 _VALID_SIMS = {"cosine", "dot"} @@ -20,7 +19,7 @@ def _l2_normalize_rows(mat: np.ndarray) -> np.ndarray: msg = f"Expected 2D matrix, got shape {mat.shape}" raise ValueError(msg) norms = np.linalg.norm(mat, axis=1, keepdims=True) - norms = np.where(norms == 0.0, 1.0, norms) # avoid division by zero + norms = np.where(norms == 0.0, 1.0, norms) return mat / norms @@ -48,9 +47,8 @@ def _maxsim_score( q = q_mat d = d_mat - # [Lq, D] @ [D, Ld] -> [Lq, Ld] - sim = q @ d.T - max_per_q = sim.max(axis=1) # max over doc tokens for each query token + sim = q @ d.T # [Lq, D] @ [D, Ld] -> [Lq, Ld] + max_per_q = sim.max(axis=1) return float(max_per_q.sum()) @@ -59,7 +57,7 @@ class FastembedColbertReranker: """ Rerank Documents using ColBERT late-interaction scoring via FastEmbed. - This component expects a *retrieved* list of Documents (e.g., top 100–500) + This component expects a *retrieved* list of Documents (e.g., top 100-500) and reorders them by ColBERT MaxSim score with respect to the input query. """ @@ -88,7 +86,6 @@ def __init__( msg = f"batch_size must be > 0, got {batch_size}" raise ValueError(msg) - # Lazy-loaded in warm_up() self._encoder = None # LateInteractionTextEmbedding self._ready = False @@ -96,10 +93,9 @@ def warm_up(self): if self._ready: return try: - from fastembed import LateInteractionTextEmbedding # type: ignore + from fastembed import LateInteractionTextEmbedding # type: ignore # noqa: PLC0415 kwargs = {"model_name": self.model, "threads": self.threads} - # best-effort: only pass token kwargs if supported for k, v in { "max_tokens_query": self.max_query_tokens, "max_tokens_document": self.max_doc_tokens, @@ -110,11 +106,8 @@ def warm_up(self): try: self._encoder = LateInteractionTextEmbedding(**kwargs) except TypeError: - # remove unknown kwargs & retry - safe_kwargs = {"model_name": self.model, "threads": self.threads} - self._encoder = LateInteractionTextEmbedding(**safe_kwargs) + self._encoder = LateInteractionTextEmbedding(model_name=self.model, threads=self.threads) - # tiny dry runs (guard generators possibly being empty) gen_q = self._encoder.query_embed(["warmup"]) next(gen_q, None) gen_d = self._encoder.embed(["warmup"]) @@ -126,9 +119,7 @@ def warm_up(self): "fastembed is not installed. Please install the FastEmbed integration:\n\n" " pip install fastembed-haystack\n" ) - raise RuntimeError( - msg - ) from e + raise RuntimeError(msg) from e except Exception as e: msg = f"Failed to initialize FastEmbed ColBERT encoder for model '{self.model}': {e}" raise RuntimeError(msg) from e @@ -139,21 +130,24 @@ def _ensure_ready(self): @staticmethod def _get_texts(documents: Sequence[Document]) -> list[str]: - # Prefer Document.content; fall back to empty string to avoid crashes return [doc.content or "" for doc in documents] def _encode_query(self, text: str) -> np.ndarray: - assert self._encoder is not None + if self._encoder is None: + msg = "Encoder is not initialized. Call warm_up() first." + raise RuntimeError(msg) arr = next(self._encoder.query_embed([text]), None) if arr is None: return np.zeros((0, 0), dtype=np.float32) a = np.asarray(arr, dtype=np.float32) - if a.ndim != 2: - a = a.reshape(-1, a.shape[-1]) # best-effort + if a.ndim != _TWO_D: + a = a.reshape(-1, a.shape[-1]) return a def _encode_docs_batched(self, texts: list[str]) -> list[np.ndarray]: - assert self._encoder is not None + if self._encoder is None: + msg = "Encoder is not initialized. Call warm_up() first." + raise RuntimeError(msg) results: list[np.ndarray] = [] for start in range(0, len(texts), self.batch_size): batch = texts[start : start + self.batch_size] @@ -162,7 +156,7 @@ def _encode_docs_batched(self, texts: list[str]) -> list[np.ndarray]: results.append(np.zeros((0, 0), dtype=np.float32)) else: a = np.asarray(emb, dtype=np.float32) - if a.ndim != 2: + if a.ndim != _TWO_D: a = a.reshape(-1, a.shape[-1]) results.append(a) if len(results) != len(texts): @@ -186,13 +180,11 @@ def run( msg = f"top_k must be >= 0, got {top_k}" raise ValueError(msg) - # keep original order indices for a stable tie-break for i, d in enumerate(docs_list): if getattr(d, "meta", None) is None: d.meta = {} d.meta.setdefault("_orig_idx", i) - # --- actual scoring --- q_mat = self._encode_query(query) doc_texts = self._get_texts(docs_list) doc_mats = self._encode_docs_batched(doc_texts) @@ -200,7 +192,6 @@ def run( for d, d_mat in zip(docs_list, doc_mats): d.score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) - # sort by score desc; deterministic tie-break by original index docs_list.sort( key=lambda d: ( d.score if d.score is not None else float("-inf"), @@ -209,7 +200,6 @@ def run( reverse=True, ) - # strip helper key for d in docs_list: d.meta.pop("_orig_idx", None) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py index 3006135a00..bf859c2f4c 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py @@ -1,7 +1,6 @@ from typing import Any, Dict, List, Optional from haystack import Document, component, default_from_dict, default_to_dict, logging - from fastembed.rerank.cross_encoder import TextCrossEncoder logger = logging.getLogger(__name__) From fb779930c62f2d927884ed5322cec6db6326ee67 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 13:15:44 +0530 Subject: [PATCH 07/14] fix(fastembed): deterministic tie-break on equal scores (descending orig_idx) Signed-off-by: Arya Tayshete --- .../rankers/fastembed/colbert_reranker.py | 11 ++++++++--- .../components/rankers/fastembed/ranker.py | 18 +++++++++++++----- .../test_fastembed_colbert_reranker.py | 7 +++---- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index cc784af980..9794afb00a 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -4,6 +4,7 @@ from __future__ import annotations from typing import Any, Sequence + import numpy as np from haystack import Document, component @@ -106,7 +107,9 @@ def warm_up(self): try: self._encoder = LateInteractionTextEmbedding(**kwargs) except TypeError: - self._encoder = LateInteractionTextEmbedding(model_name=self.model, threads=self.threads) + self._encoder = LateInteractionTextEmbedding( + model_name=self.model, threads=self.threads + ) gen_q = self._encoder.query_embed(["warmup"]) next(gen_q, None) @@ -190,12 +193,14 @@ def run( doc_mats = self._encode_docs_batched(doc_texts) for d, d_mat in zip(docs_list, doc_mats): - d.score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) + d.score = _maxsim_score( + q_mat, d_mat, similarity=self.similarity, normalize=self.normalize + ) docs_list.sort( key=lambda d: ( d.score if d.score is not None else float("-inf"), - -d.meta.get("_orig_idx", 0), + d.meta.get("_orig_idx", 0), ), reverse=True, ) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py index bf859c2f4c..2f9604909a 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py @@ -1,7 +1,7 @@ from typing import Any, Dict, List, Optional -from haystack import Document, component, default_from_dict, default_to_dict, logging from fastembed.rerank.cross_encoder import TextCrossEncoder +from haystack import Document, component, default_from_dict, default_to_dict, logging logger = logging.getLogger(__name__) @@ -131,15 +131,21 @@ def _prepare_fastembed_input_docs(self, documents: List[Document]) -> List[str]: concatenated_input_list = [] for doc in documents: meta_values_to_embed = [ - str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta.get(key) + str(doc.meta[key]) + for key in self.meta_fields_to_embed + if key in doc.meta and doc.meta.get(key) ] - concatenated_input = self.meta_data_separator.join([*meta_values_to_embed, doc.content or ""]) + concatenated_input = self.meta_data_separator.join( + [*meta_values_to_embed, doc.content or ""] + ) concatenated_input_list.append(concatenated_input) return concatenated_input_list @component.output_types(documents=List[Document]) - def run(self, query: str, documents: List[Document], top_k: Optional[int] = None) -> Dict[str, List[Document]]: + def run( + self, query: str, documents: List[Document], top_k: Optional[int] = None + ) -> Dict[str, List[Document]]: """ Returns a list of documents ranked by their similarity to the given query, using FastEmbed. @@ -156,7 +162,9 @@ def run(self, query: str, documents: List[Document], top_k: Optional[int] = None :raises ValueError: If `top_k` is not > 0. """ - if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): + if not isinstance(documents, list) or ( + documents and not isinstance(documents[0], Document) + ): msg = "FastembedRanker expects a list of Documents as input. " raise TypeError(msg) if query == "": diff --git a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py index ac717550a7..c29c7eab93 100644 --- a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py +++ b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py @@ -1,15 +1,14 @@ import numpy as np import pytest from haystack import Document -from integrations.fastembed.haystack_integrations.components.rankers.fastembed import ( - FastembedColbertReranker, -) -from integrations.fastembed.haystack_integrations.components.rankers.fastembed.colbert_reranker import ( +from haystack_integrations.components.rankers.fastembed import FastembedColbertReranker +from haystack_integrations.components.rankers.fastembed.colbert_reranker import ( _maxsim_score, _l2_normalize_rows, ) + def test_symbol_imports(): rr = FastembedColbertReranker() assert hasattr(rr, "run") From 654869344879068aaa8099a8e177fffb5ed4647d Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 14:28:14 +0530 Subject: [PATCH 08/14] chore(fastembed): 3.9-friendly typing (Optional/List) Signed-off-by: Arya Tayshete --- .../components/rankers/fastembed/colbert_reranker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 9794afb00a..7560bfe8b9 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -3,7 +3,7 @@ from __future__ import annotations -from typing import Any, Sequence +from typing import Any, Sequence, List, Optional import numpy as np from haystack import Document, component From 6d30a6564d6639d001e9c45acf5f3eb2534cc0c5 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 15:01:25 +0530 Subject: [PATCH 09/14] chore: remove accidental submodule 'haystack-core-integrations' Signed-off-by: Arya Tayshete --- haystack-core-integrations | 1 - 1 file changed, 1 deletion(-) delete mode 160000 haystack-core-integrations diff --git a/haystack-core-integrations b/haystack-core-integrations deleted file mode 160000 index f8cc0807cc..0000000000 --- a/haystack-core-integrations +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f8cc0807cc33950ca6aa07d51a7b90b7c21e665e From b909590395a3a001e91d6e67396c61856c967923 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 15:22:46 +0530 Subject: [PATCH 10/14] fix lint: remove unused imports in integrations/fastembed Signed-off-by: Arya Tayshete --- .../rankers/fastembed/colbert_utils.py | 2 +- .../rankers/fastembed/colbert_reranker.py | 10 +++------- .../components/rankers/fastembed/ranker.py | 16 ++++------------ 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py index c3669a808f..8a7e11ccc4 100644 --- a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py @@ -1,4 +1,4 @@ from __future__ import annotations -# We’ll implement real MaxSim scoring utilities here in Step 3. +# We will implement real MaxSim scoring utilities here in Step 3. # Keeping the file so imports are stable when we wire things up. diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 7560bfe8b9..8dc87efa97 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -3,7 +3,7 @@ from __future__ import annotations -from typing import Any, Sequence, List, Optional +from typing import Any, Sequence import numpy as np from haystack import Document, component @@ -107,9 +107,7 @@ def warm_up(self): try: self._encoder = LateInteractionTextEmbedding(**kwargs) except TypeError: - self._encoder = LateInteractionTextEmbedding( - model_name=self.model, threads=self.threads - ) + self._encoder = LateInteractionTextEmbedding(model_name=self.model, threads=self.threads) gen_q = self._encoder.query_embed(["warmup"]) next(gen_q, None) @@ -193,9 +191,7 @@ def run( doc_mats = self._encode_docs_batched(doc_texts) for d, d_mat in zip(docs_list, doc_mats): - d.score = _maxsim_score( - q_mat, d_mat, similarity=self.similarity, normalize=self.normalize - ) + d.score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) docs_list.sort( key=lambda d: ( diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py index 2f9604909a..87656e4218 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py @@ -131,21 +131,15 @@ def _prepare_fastembed_input_docs(self, documents: List[Document]) -> List[str]: concatenated_input_list = [] for doc in documents: meta_values_to_embed = [ - str(doc.meta[key]) - for key in self.meta_fields_to_embed - if key in doc.meta and doc.meta.get(key) + str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta.get(key) ] - concatenated_input = self.meta_data_separator.join( - [*meta_values_to_embed, doc.content or ""] - ) + concatenated_input = self.meta_data_separator.join([*meta_values_to_embed, doc.content or ""]) concatenated_input_list.append(concatenated_input) return concatenated_input_list @component.output_types(documents=List[Document]) - def run( - self, query: str, documents: List[Document], top_k: Optional[int] = None - ) -> Dict[str, List[Document]]: + def run(self, query: str, documents: List[Document], top_k: Optional[int] = None) -> Dict[str, List[Document]]: """ Returns a list of documents ranked by their similarity to the given query, using FastEmbed. @@ -162,9 +156,7 @@ def run( :raises ValueError: If `top_k` is not > 0. """ - if not isinstance(documents, list) or ( - documents and not isinstance(documents[0], Document) - ): + if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): msg = "FastembedRanker expects a list of Documents as input. " raise TypeError(msg) if query == "": From 7198b8346a39a4516e44ed903062608b1df31db7 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 15:49:31 +0530 Subject: [PATCH 11/14] fix: python 3.9 on linux Signed-off-by: Arya Tayshete --- .../components/rankers/fastembed/colbert_reranker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 8dc87efa97..48296d36af 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -104,11 +104,10 @@ def warm_up(self): if v is not None: kwargs[k] = v - try: - self._encoder = LateInteractionTextEmbedding(**kwargs) - except TypeError: - self._encoder = LateInteractionTextEmbedding(model_name=self.model, threads=self.threads) - + # Only include keys that are not None + init_kwargs = {k: v for k, v in kwargs.items() if v is not None} + self._encoder = LateInteractionTextEmbedding(**init_kwargs) + gen_q = self._encoder.query_embed(["warmup"]) next(gen_q, None) gen_d = self._encoder.embed(["warmup"]) From fdb1d9e61e5e2c6eb965d1644c941064b7814131 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Thu, 30 Oct 2025 15:55:48 +0530 Subject: [PATCH 12/14] fix: trailing whitespaces Signed-off-by: Arya Tayshete --- .../components/rankers/fastembed/colbert_reranker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 48296d36af..701b947d90 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -107,7 +107,7 @@ def warm_up(self): # Only include keys that are not None init_kwargs = {k: v for k, v in kwargs.items() if v is not None} self._encoder = LateInteractionTextEmbedding(**init_kwargs) - + gen_q = self._encoder.query_embed(["warmup"]) next(gen_q, None) gen_d = self._encoder.embed(["warmup"]) From 3cf8a37af85b8af640c2ef23009ea6b76cea6488 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Fri, 31 Oct 2025 11:12:36 +0530 Subject: [PATCH 13/14] fix: python 3.9 on linux Signed-off-by: Arya Tayshete --- .../rankers/fastembed/colbert_reranker.py | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 701b947d90..30387cfc5c 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -3,11 +3,13 @@ from __future__ import annotations -from typing import Any, Sequence - +from typing import Any, Sequence, Optional, TYPE_CHECKING import numpy as np from haystack import Document, component +if TYPE_CHECKING: + from fastembed import LateInteractionTextEmbedding # type: ignore + _TWO_D = 2 _VALID_SIMS = {"cosine", "dot"} @@ -87,7 +89,7 @@ def __init__( msg = f"batch_size must be > 0, got {batch_size}" raise ValueError(msg) - self._encoder = None # LateInteractionTextEmbedding + self._encoder: Optional["LateInteractionTextEmbedding"] = None # LateInteractionTextEmbedding self._ready = False def warm_up(self): @@ -96,18 +98,14 @@ def warm_up(self): try: from fastembed import LateInteractionTextEmbedding # type: ignore # noqa: PLC0415 - kwargs = {"model_name": self.model, "threads": self.threads} - for k, v in { - "max_tokens_query": self.max_query_tokens, - "max_tokens_document": self.max_doc_tokens, - }.items(): - if v is not None: - kwargs[k] = v - - # Only include keys that are not None - init_kwargs = {k: v for k, v in kwargs.items() if v is not None} - self._encoder = LateInteractionTextEmbedding(**init_kwargs) - + # Call constructor with explicit named arguments so the type checker can verify them. + # Passing None for optional args is OK if the fastembed constructor accepts Optional values. + self._encoder = LateInteractionTextEmbedding( + model_name=self.model, + threads=self.threads, + max_tokens_query=self.max_query_tokens, + max_tokens_document=self.max_doc_tokens, + ) gen_q = self._encoder.query_embed(["warmup"]) next(gen_q, None) gen_d = self._encoder.embed(["warmup"]) From 217d21323a8b94f974db9478734b5e6e8d894f34 Mon Sep 17 00:00:00 2001 From: Arya Tayshete Date: Fri, 31 Oct 2025 11:30:53 +0530 Subject: [PATCH 14/14] chore: silence UP045 on Optional for py3.9 Signed-off-by: Arya Tayshete --- .../rankers/fastembed/colbert_reranker.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py index 30387cfc5c..e2a26ee710 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -3,7 +3,8 @@ from __future__ import annotations -from typing import Any, Sequence, Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Iterator, Optional, Sequence + import numpy as np from haystack import Document, component @@ -89,7 +90,7 @@ def __init__( msg = f"batch_size must be > 0, got {batch_size}" raise ValueError(msg) - self._encoder: Optional["LateInteractionTextEmbedding"] = None # LateInteractionTextEmbedding + self._encoder: Optional[LateInteractionTextEmbedding] = None # LateInteractionTextEmbedding # noqa: UP045 self._ready = False def warm_up(self): @@ -106,10 +107,10 @@ def warm_up(self): max_tokens_query=self.max_query_tokens, max_tokens_document=self.max_doc_tokens, ) - gen_q = self._encoder.query_embed(["warmup"]) - next(gen_q, None) - gen_d = self._encoder.embed(["warmup"]) - next(gen_d, None) + gen_q_iter: Iterator[np.ndarray] = iter(self._encoder.query_embed(["warmup"])) + next(gen_q_iter, None) + gen_d_iter: Iterator[np.ndarray] = iter(self._encoder.embed(["warmup"])) + next(gen_d_iter, None) self._ready = True except ModuleNotFoundError as e: @@ -134,7 +135,8 @@ def _encode_query(self, text: str) -> np.ndarray: if self._encoder is None: msg = "Encoder is not initialized. Call warm_up() first." raise RuntimeError(msg) - arr = next(self._encoder.query_embed([text]), None) + it: Iterator[np.ndarray] = iter(self._encoder.query_embed([text])) + arr = next(it, None) if arr is None: return np.zeros((0, 0), dtype=np.float32) a = np.asarray(arr, dtype=np.float32)