diff --git a/haystack/components/retrievers/auto_merging_retriever.py b/haystack/components/retrievers/auto_merging_retriever.py index 86e4638b8a..bbb1724c7c 100644 --- a/haystack/components/retrievers/auto_merging_retriever.py +++ b/haystack/components/retrievers/auto_merging_retriever.py @@ -167,3 +167,61 @@ def _try_merge_level(docs_to_merge: list[Document], docs_to_return: list[Documen return _try_merge_level(merged_docs, docs_to_return) return {"documents": _try_merge_level(documents, [])} + + @component.output_types(documents=list[Document]) + async def run_async(self, documents: list[Document]): + """ + Asynchronously run the AutoMergingRetriever. + + Recursively groups documents by their parents and merges them if they meet the threshold, + continuing up the hierarchy until no more merges are possible. + + :param documents: List of leaf documents that were matched by a retriever + :returns: + List of documents (could be a mix of different hierarchy levels) + """ + + AutoMergingRetriever._check_valid_documents(documents) + + async def _get_parent_doc(parent_id: str) -> Document: + parent_docs = await self.document_store.filter_documents_async( + {"field": "id", "operator": "==", "value": parent_id} + ) + if len(parent_docs) != 1: + raise ValueError(f"Expected 1 parent document with id {parent_id}, found {len(parent_docs)}") + + parent_doc = parent_docs[0] + if not parent_doc.meta.get("__children_ids"): + raise ValueError(f"Parent document with id {parent_id} does not have any children.") + + return parent_doc + + async def _try_merge_level(docs_to_merge: list[Document], docs_to_return: list[Document]) -> list[Document]: + parent_doc_id_to_child_docs: dict[str, list[Document]] = defaultdict(list) # to group documents by parent + + for doc in docs_to_merge: + if doc.meta.get("__parent_id"): # only docs that have parents + parent_doc_id_to_child_docs[doc.meta["__parent_id"]].append(doc) + else: + docs_to_return.append(doc) # keep docs that have no parents + + # Process each parent group + merged_docs = [] + for parent_doc_id, child_docs in parent_doc_id_to_child_docs.items(): + parent_doc = await _get_parent_doc(parent_doc_id) + + # Calculate merge score + score = len(child_docs) / len(parent_doc.meta["__children_ids"]) + if score > self.threshold: + merged_docs.append(parent_doc) # Merge into parent + else: + docs_to_return.extend(child_docs) # Keep children separate + + # if no new merges were made, we're done + if not merged_docs: + return merged_docs + docs_to_return + + # Recursively try to merge the next level + return await _try_merge_level(merged_docs, docs_to_return) + + return {"documents": await _try_merge_level(documents, [])} diff --git a/haystack/components/retrievers/filter_retriever.py b/haystack/components/retrievers/filter_retriever.py index b0f31aa8a5..5eb1571b2a 100644 --- a/haystack/components/retrievers/filter_retriever.py +++ b/haystack/components/retrievers/filter_retriever.py @@ -92,3 +92,17 @@ def run(self, filters: Optional[dict[str, Any]] = None): A list of retrieved documents. """ return {"documents": self.document_store.filter_documents(filters=filters or self.filters)} + + @component.output_types(documents=list[Document]) + async def run_async(self, filters: Optional[dict[str, Any]] = None): + """ + Asynchronously run the FilterRetriever on the given input data. + + :param filters: + A dictionary with filters to narrow down the search space. + If not specified, the FilterRetriever uses the values provided at initialization. + :returns: + A list of retrieved documents. + """ + out_documents = await self.document_store.filter_documents_async(filters=filters or self.filters) + return {"documents": out_documents} diff --git a/releasenotes/notes/support-async-in-filter-and-automerging-retrievers-1cc5535099ddd1d1.yaml b/releasenotes/notes/support-async-in-filter-and-automerging-retrievers-1cc5535099ddd1d1.yaml new file mode 100644 index 0000000000..15b988448c --- /dev/null +++ b/releasenotes/notes/support-async-in-filter-and-automerging-retrievers-1cc5535099ddd1d1.yaml @@ -0,0 +1,3 @@ +enhancements: + - | + Add run_async method to FilterRetriever and AutoMergingRetriever components to support asynchronous execution. diff --git a/test/components/retrievers/test_auto_merging_retriever_async.py b/test/components/retrievers/test_auto_merging_retriever_async.py new file mode 100644 index 0000000000..4913c2aeaf --- /dev/null +++ b/test/components/retrievers/test_auto_merging_retriever_async.py @@ -0,0 +1,218 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import pytest + +from haystack import Document, Pipeline +from haystack.components.preprocessors import HierarchicalDocumentSplitter +from haystack.components.retrievers import InMemoryBM25Retriever +from haystack.components.retrievers.auto_merging_retriever import AutoMergingRetriever +from haystack.document_stores.in_memory import InMemoryDocumentStore + + +class TestAutoMergingRetrieverAsync: + @pytest.mark.asyncio + async def test_run_missing_parent_id(self): + docs = [Document(content="test", meta={"__level": 1, "__block_size": 10})] + retriever = AutoMergingRetriever(InMemoryDocumentStore()) + with pytest.raises( + ValueError, match="The matched leaf documents do not have the required meta field '__parent_id'" + ): + await retriever.run_async(documents=docs) + + @pytest.mark.asyncio + async def test_run_missing_level(self): + docs = [Document(content="test", meta={"__parent_id": "parent1", "__block_size": 10})] + + retriever = AutoMergingRetriever(InMemoryDocumentStore()) + with pytest.raises( + ValueError, match="The matched leaf documents do not have the required meta field '__level'" + ): + await retriever.run_async(documents=docs) + + @pytest.mark.asyncio + async def test_run_missing_block_size(self): + docs = [Document(content="test", meta={"__parent_id": "parent1", "__level": 1})] + + retriever = AutoMergingRetriever(InMemoryDocumentStore()) + with pytest.raises( + ValueError, match="The matched leaf documents do not have the required meta field '__block_size'" + ): + await retriever.run_async(documents=docs) + + @pytest.mark.asyncio + async def test_run_mixed_valid_and_invalid_documents(self): + docs = [ + Document(content="valid", meta={"__parent_id": "parent1", "__level": 1, "__block_size": 10}), + Document(content="invalid", meta={"__level": 1, "__block_size": 10}), + ] + retriever = AutoMergingRetriever(InMemoryDocumentStore()) + with pytest.raises( + ValueError, match="The matched leaf documents do not have the required meta field '__parent_id'" + ): + await retriever.run_async(documents=docs) + + @pytest.mark.asyncio + async def test_run_parent_not_found(self): + doc_store = InMemoryDocumentStore() + retriever = AutoMergingRetriever(doc_store, threshold=0.5) + + # a leaf document with a non-existent parent_id + leaf_doc = Document( + content="test", meta={"__parent_id": "non_existent_parent", "__level": 1, "__block_size": 10} + ) + + with pytest.raises(ValueError, match="Expected 1 parent document with id non_existent_parent, found 0"): + await retriever.run_async([leaf_doc]) + + @pytest.mark.asyncio + async def test_run_parent_without_children_metadata(self): + """Test case where a parent document exists but doesn't have the __children_ids metadata field""" + doc_store = InMemoryDocumentStore() + + # Create and store a parent document without __children_ids metadata + parent_doc = Document( + content="parent content", + id="parent1", + meta={ + "__level": 1, # Add other required metadata + "__block_size": 10, + }, + ) + doc_store.write_documents([parent_doc]) + + retriever = AutoMergingRetriever(doc_store, threshold=0.5) + + # Create a leaf document that points to this parent + leaf_doc = Document(content="leaf content", meta={"__parent_id": "parent1", "__level": 2, "__block_size": 5}) + + with pytest.raises(ValueError, match="Parent document with id parent1 does not have any children"): + await retriever.run_async([leaf_doc]) + + @pytest.mark.asyncio + async def test_run_empty_documents(self): + retriever = AutoMergingRetriever(InMemoryDocumentStore()) + assert await retriever.run_async([]) == {"documents": []} + + @pytest.mark.asyncio + async def test_run_return_parent_document(self): + text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing." + + docs = [Document(content=text)] + builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word") + docs = builder.run(docs) + + # store all non-leaf documents + doc_store_parents = InMemoryDocumentStore() + for doc in docs["documents"]: + if doc.meta["__children_ids"]: + doc_store_parents.write_documents([doc]) + retriever = AutoMergingRetriever(doc_store_parents, threshold=0.5) + + # assume we retrieved 2 leaf docs from the same parent, the parent document should be returned, + # since it has 3 children and the threshold=0.5, and we retrieved 2 children (2/3 > 0.66(6)) + leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]] + docs = await retriever.run_async(leaf_docs[4:6]) + assert len(docs["documents"]) == 1 + assert docs["documents"][0].content == "warm glow over the trees. Birds began to sing." + assert len(docs["documents"][0].meta["__children_ids"]) == 3 + + @pytest.mark.asyncio + async def test_run_return_leafs_document(self): + docs = [Document(content="The monarch of the wild blue yonder rises from the eastern side of the horizon.")] + builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word") + docs = builder.run(docs) + + doc_store_parents = InMemoryDocumentStore() + for doc in docs["documents"]: + if doc.meta["__level"] == 1: + doc_store_parents.write_documents([doc]) + + leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]] + retriever = AutoMergingRetriever(doc_store_parents, threshold=0.6) + result = await retriever.run_async([leaf_docs[4]]) + + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "eastern side of " + assert result["documents"][0].meta["__parent_id"] == docs["documents"][2].id + + @pytest.mark.asyncio + async def test_run_return_leafs_document_different_parents(self): + docs = [Document(content="The monarch of the wild blue yonder rises from the eastern side of the horizon.")] + builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word") + docs = builder.run(docs) + + doc_store_parents = InMemoryDocumentStore() + for doc in docs["documents"]: + if doc.meta["__level"] == 1: + doc_store_parents.write_documents([doc]) + + leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]] + retriever = AutoMergingRetriever(doc_store_parents, threshold=0.6) + result = await retriever.run_async([leaf_docs[4], leaf_docs[3]]) + + assert len(result["documents"]) == 2 + assert result["documents"][0].meta["__parent_id"] != result["documents"][1].meta["__parent_id"] + + @pytest.mark.asyncio + async def test_run_go_up_hierarchy_multiple_levels(self): + """ + Test if the retriever can go up the hierarchy multiple levels to find the parent document. + + Simulate a scenario where we have 4 leaf-documents that matched some initial query. The leaf-documents + are continuously merged up the hierarchy until the threshold is no longer met. + In this case it goes from the 4th level in the hierarchy up the 1st level. + """ + text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing." + + docs = [Document(content=text)] + builder = HierarchicalDocumentSplitter(block_sizes={6, 4, 2, 1}, split_overlap=0, split_by="word") + docs = builder.run(docs) + + # store all non-leaf documents + doc_store_parents = InMemoryDocumentStore() + for doc in docs["documents"]: + if doc.meta["__children_ids"]: + doc_store_parents.write_documents([doc]) + retriever = AutoMergingRetriever(doc_store_parents, threshold=0.4) + + # simulate a scenario where we have 4 leaf-documents that matched some initial query + retrieved_leaf_docs = [d for d in docs["documents"] if d.content in {"The ", "sun ", "rose ", "early "}] + + result = await retriever.run_async(retrieved_leaf_docs) + + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "The sun rose early in the " + + @pytest.mark.asyncio + async def test_run_go_up_hierarchy_multiple_levels_hit_root_document(self): + """ + Test case where we go up hierarchy until the root document, so the root document is returned. + + It's the only document in the hierarchy which has no parent. + """ + text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing." + + docs = [Document(content=text)] + builder = HierarchicalDocumentSplitter(block_sizes={6, 4}, split_overlap=0, split_by="word") + docs = builder.run(docs) + + # store all non-leaf documents + doc_store_parents = InMemoryDocumentStore() + for doc in docs["documents"]: + if doc.meta["__children_ids"]: + doc_store_parents.write_documents([doc]) + retriever = AutoMergingRetriever(doc_store_parents, threshold=0.1) # set a low threshold to hit root document + + # simulate a scenario where we have 4 leaf-documents that matched some initial query + retrieved_leaf_docs = [ + d + for d in docs["documents"] + if d.content in {"The sun rose early ", "in the ", "morning. It cast a ", "over the trees. Birds "} + ] + + result = await retriever.run_async(retrieved_leaf_docs) + + assert len(result["documents"]) == 1 + assert result["documents"][0].meta["__level"] == 0 # hit root document diff --git a/test/components/retrievers/test_filter_retriever_async.py b/test/components/retrievers/test_filter_retriever_async.py new file mode 100644 index 0000000000..a8668aadd8 --- /dev/null +++ b/test/components/retrievers/test_filter_retriever_async.py @@ -0,0 +1,96 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +import pytest + +from haystack import AsyncPipeline, DeserializationError, Pipeline +from haystack.components.retrievers.filter_retriever import FilterRetriever +from haystack.dataclasses import Document +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.testing.factory import document_store_class + + +@pytest.fixture() +def sample_docs(): + en_docs = [ + Document(content="Javascript is a popular programming language", meta={"lang": "en"}), + Document(content="Python is a popular programming language", meta={"lang": "en"}), + Document(content="A chromosome is a package of DNA ", meta={"lang": "en"}), + ] + de_docs = [ + Document(content="python ist eine beliebte Programmiersprache", meta={"lang": "de"}), + Document(content="javascript ist eine beliebte Programmiersprache", meta={"lang": "de"}), + ] + all_docs = en_docs + de_docs + return {"en_docs": en_docs, "de_docs": de_docs, "all_docs": all_docs} + + +@pytest.fixture() +def sample_document_store(sample_docs): + doc_store = InMemoryDocumentStore() + doc_store.write_documents(sample_docs["all_docs"]) + return doc_store + + +class TestFilterRetrieverAsync: + @classmethod + def _documents_equal(cls, docs1: list[Document], docs2: list[Document]) -> bool: + # # Order doesn't matter; we sort before comparing + docs1.sort(key=lambda x: x.id) + docs2.sort(key=lambda x: x.id) + return docs1 == docs2 + + @pytest.mark.asyncio + async def test_retriever_init_filter(self, sample_document_store, sample_docs): + retriever = FilterRetriever(sample_document_store, filters={"field": "lang", "operator": "==", "value": "en"}) + result = await retriever.run_async() + + assert "documents" in result + assert len(result["documents"]) == 3 + assert TestFilterRetrieverAsync._documents_equal(result["documents"], sample_docs["en_docs"]) + + @pytest.mark.asyncio + async def test_retriever_runtime_filter(self, sample_document_store, sample_docs): + retriever = FilterRetriever(sample_document_store) + result = await retriever.run_async(filters={"field": "lang", "operator": "==", "value": "en"}) + + assert "documents" in result + assert len(result["documents"]) == 3 + assert TestFilterRetrieverAsync._documents_equal(result["documents"], sample_docs["en_docs"]) + + @pytest.mark.asyncio + async def test_retriever_init_filter_run_filter_override(self, sample_document_store, sample_docs): + retriever = FilterRetriever(sample_document_store, filters={"field": "lang", "operator": "==", "value": "en"}) + result = await retriever.run_async(filters={"field": "lang", "operator": "==", "value": "de"}) + + assert "documents" in result + assert len(result["documents"]) == 2 + assert TestFilterRetrieverAsync._documents_equal(result["documents"], sample_docs["de_docs"]) + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_with_pipeline(self, sample_document_store, sample_docs): + retriever = FilterRetriever(sample_document_store, filters={"field": "lang", "operator": "==", "value": "de"}) + + pipeline = AsyncPipeline() + pipeline.add_component("retriever", retriever) + result: dict[str, Any] = await pipeline.run_async(data={"retriever": {}}) + + assert result + assert "retriever" in result + results_docs = result["retriever"]["documents"] + assert results_docs + assert TestFilterRetrieverAsync._documents_equal(results_docs, sample_docs["de_docs"]) + + result: dict[str, Any] = await pipeline.run_async( + data={"retriever": {"filters": {"field": "lang", "operator": "==", "value": "en"}}} + ) + + assert result + assert "retriever" in result + results_docs = result["retriever"]["documents"] + assert results_docs + assert TestFilterRetrieverAsync._documents_equal(results_docs, sample_docs["en_docs"])