diff --git a/examples/fastembed_colbert_reranker.py b/examples/fastembed_colbert_reranker.py new file mode 100644 index 0000000000..d2b853f16a --- /dev/null +++ b/examples/fastembed_colbert_reranker.py @@ -0,0 +1,20 @@ +from haystack import Document +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.components.retrievers.in_memory import InMemoryBM25Retriever +from haystack_integrations.components.rankers.fastembed import FastembedColbertReranker + +store = InMemoryDocumentStore() +store.write_documents([ + Document(content="ColBERT uses late interaction over token embeddings."), + Document(content="BM25 is a classic sparse retrieval model."), + Document(content="Cross-encoders directly score query–document pairs."), +]) + +retriever = InMemoryBM25Retriever(document_store=store) +reranker = FastembedColbertReranker(model="colbert-ir/colbertv2.0") + +query = "late interaction reranking" +cand = retriever.run(query=query, top_k=200) +out = reranker.run(query=query, documents=cand["documents"], top_k=5) +for i, d in enumerate(out["documents"], 1): + print(i, round(d.score, 3), d.content) diff --git a/integrations/fastembed/CHANGELOG.md b/integrations/fastembed/CHANGELOG.md index d771f8c2fb..711ecf8ea9 100644 --- a/integrations/fastembed/CHANGELOG.md +++ b/integrations/fastembed/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog +## [Unreleased] + +### 🚀 Features +- Add `FastembedColbertReranker`: ColBERT late-interaction (MaxSim) reranker for 1st-stage candidates (100–500). (#2482) + +### 📚 Documentation +- FastEmbed docs: “ColBERT Reranker” section with quickstart + pipeline example. (#2482) +- Integration README: add bullet for `FastembedColbertReranker` and links. (#2482) + +### 🧪 Testing +- Unit tests for MaxSim math, parameter validation, stable tie-break, and monkeypatched ranking (no model download). (#2482) + +### 🧹 Chores +- Parameter validation, robust warm-up for fastembed API drift, stable sorting on score ties. (#2482) + + ## [integrations/fastembed-v1.5.0] - 2025-06-17 ### 🐛 Bug Fixes diff --git a/integrations/fastembed/README.md b/integrations/fastembed/README.md index 65596aa462..3ee2edbe8d 100644 --- a/integrations/fastembed/README.md +++ b/integrations/fastembed/README.md @@ -6,6 +6,26 @@ - [Integration page](https://haystack.deepset.ai/integrations/fastembed) - [Changelog](https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/fastembed/CHANGELOG.md) +## Components + +- **FastembedTextEmbedder / FastembedDocumentEmbedder / Sparse embedders** – embedding components for dense and sparse retrieval (see docs). +- **FastembedRanker** – cross-encoder reranker (see docs). +- **FastembedColbertReranker** – *new* ColBERT late-interaction (MaxSim) reranker for reordering 100–500 candidates on CPU via ONNX. + → See the docs for installation and examples. + +### Documentation + +- Integration page: https://haystack.deepset.ai/integrations/fastembed +- Component docs: + - Fastembed embedders: https://docs.haystack.deepset.ai/docs/fastembedtextembedder + - FastembedRanker (cross-encoder): https://docs.haystack.deepset.ai/docs/fastembedranker + - **FastembedColbertReranker (ColBERT)**: *[link to your new docs section]* + +### Install + +```bash +pip install fastembed-haystack + --- ## Contributing diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py new file mode 100644 index 0000000000..b1999306c6 --- /dev/null +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/__init__.py @@ -0,0 +1,3 @@ +from .colbert_reranker import FastembedColbertReranker + +__all__ = ["FastembedColbertReranker"] diff --git a/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py new file mode 100644 index 0000000000..8a7e11ccc4 --- /dev/null +++ b/integrations/fastembed/haystack_integrations/components/rankers/fastembed/colbert_utils.py @@ -0,0 +1,4 @@ +from __future__ import annotations + +# We will implement real MaxSim scoring utilities here in Step 3. +# Keeping the file so imports are stable when we wire things up. diff --git a/integrations/fastembed/src/haystack_integrations/__init__.py b/integrations/fastembed/src/haystack_integrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/fastembed/src/haystack_integrations/components/__init__.py b/integrations/fastembed/src/haystack_integrations/components/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/__init__.py b/integrations/fastembed/src/haystack_integrations/components/rankers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py index ece5e858b9..b1999306c6 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/__init__.py @@ -1,3 +1,3 @@ -from .ranker import FastembedRanker +from .colbert_reranker import FastembedColbertReranker -__all__ = ["FastembedRanker"] +__all__ = ["FastembedColbertReranker"] diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py new file mode 100644 index 0000000000..e2a26ee710 --- /dev/null +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/colbert_reranker.py @@ -0,0 +1,209 @@ +# SPDX-FileCopyrightText: 2025-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Iterator, Optional, Sequence + +import numpy as np +from haystack import Document, component + +if TYPE_CHECKING: + from fastembed import LateInteractionTextEmbedding # type: ignore + +_TWO_D = 2 +_VALID_SIMS = {"cosine", "dot"} + + +def _l2_normalize_rows(mat: np.ndarray) -> np.ndarray: + """ + L2-normalize each row vector in a 2D array. Safe for zero rows. + """ + if mat.ndim != _TWO_D: + msg = f"Expected 2D matrix, got shape {mat.shape}" + raise ValueError(msg) + norms = np.linalg.norm(mat, axis=1, keepdims=True) + norms = np.where(norms == 0.0, 1.0, norms) + return mat / norms + + +def _maxsim_score( + q_mat: np.ndarray, + d_mat: np.ndarray, + *, + similarity: str = "cosine", + normalize: bool = True, +) -> float: + """ + ColBERT late-interaction (MaxSim) score: + For each query token vector, take the maximum similarity across all doc token vectors, then sum. + """ + if q_mat.size == 0 or d_mat.size == 0: + return 0.0 + if similarity not in _VALID_SIMS: + msg = f"Unsupported similarity '{similarity}'. Use 'cosine' or 'dot'." + raise ValueError(msg) + + if similarity == "cosine" and normalize: + q = _l2_normalize_rows(q_mat) + d = _l2_normalize_rows(d_mat) + else: + q = q_mat + d = d_mat + + sim = q @ d.T # [Lq, D] @ [D, Ld] -> [Lq, Ld] + max_per_q = sim.max(axis=1) + return float(max_per_q.sum()) + + +@component +class FastembedColbertReranker: + """ + Rerank Documents using ColBERT late-interaction scoring via FastEmbed. + + This component expects a *retrieved* list of Documents (e.g., top 100-500) + and reorders them by ColBERT MaxSim score with respect to the input query. + """ + + def __init__( + self, + model: str = "colbert-ir/colbertv2.0", + batch_size: int = 16, + threads: int | None = None, + similarity: str = "cosine", + normalize: bool = True, + max_query_tokens: int | None = None, + max_doc_tokens: int | None = None, + ): + self.model = model + self.batch_size = batch_size + self.threads = threads + self.similarity = similarity + self.normalize = normalize + self.max_query_tokens = max_query_tokens + self.max_doc_tokens = max_doc_tokens + + if similarity not in _VALID_SIMS: + msg = f"similarity must be one of {_VALID_SIMS}, got {similarity!r}" + raise ValueError(msg) + if batch_size <= 0: + msg = f"batch_size must be > 0, got {batch_size}" + raise ValueError(msg) + + self._encoder: Optional[LateInteractionTextEmbedding] = None # LateInteractionTextEmbedding # noqa: UP045 + self._ready = False + + def warm_up(self): + if self._ready: + return + try: + from fastembed import LateInteractionTextEmbedding # type: ignore # noqa: PLC0415 + + # Call constructor with explicit named arguments so the type checker can verify them. + # Passing None for optional args is OK if the fastembed constructor accepts Optional values. + self._encoder = LateInteractionTextEmbedding( + model_name=self.model, + threads=self.threads, + max_tokens_query=self.max_query_tokens, + max_tokens_document=self.max_doc_tokens, + ) + gen_q_iter: Iterator[np.ndarray] = iter(self._encoder.query_embed(["warmup"])) + next(gen_q_iter, None) + gen_d_iter: Iterator[np.ndarray] = iter(self._encoder.embed(["warmup"])) + next(gen_d_iter, None) + + self._ready = True + except ModuleNotFoundError as e: + msg = ( + "fastembed is not installed. Please install the FastEmbed integration:\n\n" + " pip install fastembed-haystack\n" + ) + raise RuntimeError(msg) from e + except Exception as e: + msg = f"Failed to initialize FastEmbed ColBERT encoder for model '{self.model}': {e}" + raise RuntimeError(msg) from e + + def _ensure_ready(self): + if not self._ready: + self.warm_up() + + @staticmethod + def _get_texts(documents: Sequence[Document]) -> list[str]: + return [doc.content or "" for doc in documents] + + def _encode_query(self, text: str) -> np.ndarray: + if self._encoder is None: + msg = "Encoder is not initialized. Call warm_up() first." + raise RuntimeError(msg) + it: Iterator[np.ndarray] = iter(self._encoder.query_embed([text])) + arr = next(it, None) + if arr is None: + return np.zeros((0, 0), dtype=np.float32) + a = np.asarray(arr, dtype=np.float32) + if a.ndim != _TWO_D: + a = a.reshape(-1, a.shape[-1]) + return a + + def _encode_docs_batched(self, texts: list[str]) -> list[np.ndarray]: + if self._encoder is None: + msg = "Encoder is not initialized. Call warm_up() first." + raise RuntimeError(msg) + results: list[np.ndarray] = [] + for start in range(0, len(texts), self.batch_size): + batch = texts[start : start + self.batch_size] + for emb in self._encoder.embed(batch): + if emb is None: + results.append(np.zeros((0, 0), dtype=np.float32)) + else: + a = np.asarray(emb, dtype=np.float32) + if a.ndim != _TWO_D: + a = a.reshape(-1, a.shape[-1]) + results.append(a) + if len(results) != len(texts): + msg = f"Encoder returned {len(results)} embeddings for {len(texts)} documents." + raise RuntimeError(msg) + return results + + def run( + self, + query: str, + documents: Sequence[Document], + top_k: int | None = None, + ) -> dict[str, Any]: + self._ensure_ready() + + docs_list = list(documents) + if not docs_list: + return {"documents": []} + + if top_k is not None and top_k < 0: + msg = f"top_k must be >= 0, got {top_k}" + raise ValueError(msg) + + for i, d in enumerate(docs_list): + if getattr(d, "meta", None) is None: + d.meta = {} + d.meta.setdefault("_orig_idx", i) + + q_mat = self._encode_query(query) + doc_texts = self._get_texts(docs_list) + doc_mats = self._encode_docs_batched(doc_texts) + + for d, d_mat in zip(docs_list, doc_mats): + d.score = _maxsim_score(q_mat, d_mat, similarity=self.similarity, normalize=self.normalize) + + docs_list.sort( + key=lambda d: ( + d.score if d.score is not None else float("-inf"), + d.meta.get("_orig_idx", 0), + ), + reverse=True, + ) + + for d in docs_list: + d.meta.pop("_orig_idx", None) + + if top_k is not None: + docs_list = docs_list[:top_k] + + return {"documents": docs_list} diff --git a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py index 3006135a00..87656e4218 100644 --- a/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py +++ b/integrations/fastembed/src/haystack_integrations/components/rankers/fastembed/ranker.py @@ -1,8 +1,7 @@ from typing import Any, Dict, List, Optional -from haystack import Document, component, default_from_dict, default_to_dict, logging - from fastembed.rerank.cross_encoder import TextCrossEncoder +from haystack import Document, component, default_from_dict, default_to_dict, logging logger = logging.getLogger(__name__) diff --git a/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py new file mode 100644 index 0000000000..c29c7eab93 --- /dev/null +++ b/tests/components/rankers/fastembed/test_fastembed_colbert_reranker.py @@ -0,0 +1,120 @@ +import numpy as np +import pytest +from haystack import Document +from haystack_integrations.components.rankers.fastembed import FastembedColbertReranker +from haystack_integrations.components.rankers.fastembed.colbert_reranker import ( + _maxsim_score, + _l2_normalize_rows, +) + + + +def test_symbol_imports(): + rr = FastembedColbertReranker() + assert hasattr(rr, "run") + + +def test_l2_normalize_rows_shapes_and_zeros(): + x = np.array([[3.0, 4.0], [0.0, 0.0]], dtype=np.float32) + y = _l2_normalize_rows(x) + # First row becomes unit norm, second row stays zeros (safe divide) + assert y.shape == x.shape + assert np.isclose(np.linalg.norm(y[0]), 1.0, atol=1e-6) + assert np.allclose(y[1], np.zeros_like(y[1])) + + +def test_maxsim_score_simple_cosine(): + # Query has two tokens; doc has three tokens + q = np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32) # e1, e2 + d = np.array([[1.0, 0.0], [0.5, 0.5], [0.0, 1.0]], dtype=np.float32) # e1, mid, e2 + + # With cosine + normalization, MaxSim per q-token is 1.0 for both -> sum = 2.0 + score = _maxsim_score(q, d, similarity="cosine", normalize=True) + assert np.isclose(score, 2.0, atol=1e-6) + + +def test_run_no_documents_returns_empty(): + rr = FastembedColbertReranker() + result = rr.run(query="test", documents=[]) + assert result["documents"] == [] + + +def test_run_topk_slices_without_fastembed(monkeypatch): + """ + We monkeypatch the encoder methods to avoid requiring FastEmbed models in CI. + This also checks sorting by score and top_k behavior deterministically. + """ + rr = FastembedColbertReranker() + + # Fake "ready" state (skip warm_up + fastembed import) + rr._ready = True + rr._encoder = object() + + # Query encoding -> 2x2 (arbitrary but fixed) + def fake_encode_query(text: str): + return np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32) + + # Doc encodings -> create three docs with different alignments + def fake_encode_docs(texts): + mats = [] + for t in texts: + if "best" in t: + mats.append(np.array([[1.0, 0.0], [0.0, 1.0]], dtype=np.float32)) # high + elif "ok" in t: + mats.append(np.array([[0.8, 0.2], [0.2, 0.8]], dtype=np.float32)) # medium + else: + mats.append(np.array([[1.0, 0.0]], dtype=np.float32)) # lower + return mats + + rr._encode_query = fake_encode_query # type: ignore + rr._encode_docs_batched = fake_encode_docs # type: ignore + + docs = [ + Document(content="this is lower"), + Document(content="this is ok"), + Document(content="this is best"), + ] + out = rr.run(query="q", documents=docs, top_k=2) + ranked = out["documents"] + + # Expect "best" first, then "ok" + assert len(ranked) == 2 + assert "best" in ranked[0].content + assert "ok" in ranked[1].content + assert ranked[0].score >= ranked[1].score + + +def test_param_validation(): + with pytest.raises(ValueError): + FastembedColbertReranker(batch_size=0) + with pytest.raises(ValueError): + FastembedColbertReranker(similarity="euclid") # unsupported + + +def test_topk_validation(): + rr = FastembedColbertReranker() + with pytest.raises(ValueError): + rr.run(query="q", documents=[Document(content="x")], top_k=-1) + + +def test_stable_tie_break(monkeypatch): + rr = FastembedColbertReranker() + rr._ready = True + rr._encoder = object() + + def fake_q(_): # same query vectors + return np.eye(2, dtype=np.float32) + + def fake_docs(texts): + # craft docs with identical scores → tie + mat = np.eye(2, dtype=np.float32) + return [mat for _ in texts] + + rr._encode_query = fake_q # type: ignore + rr._encode_docs_batched = fake_docs # type: ignore + + docs = [Document(content=f"doc{i}") for i in range(4)] + out = rr.run(query="q", documents=docs) + ranked = out["documents"] + # Ensure length preserved and deterministic order + assert [d.content for d in ranked] == ["doc3", "doc2", "doc1", "doc0"] # due to tie-break we defined