Skip to content

Commit bc4767c

Browse files
Add support for .annotate() queries with tests to verify functionality
Co-Authored-By: Nishant Singh <saysnishant@gmail.com>
1 parent bedad57 commit bc4767c

File tree

1 file changed

+235
-0
lines changed

1 file changed

+235
-0
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
from datetime import datetime, timezone
2+
from decimal import Decimal
3+
4+
from django.db.models import Count, F, Value, IntegerField, Sum
5+
from django.test import TransactionTestCase
6+
from model_bakery import baker
7+
8+
from django_querysets_single_query_fetch.service import (
9+
QuerysetsSingleQueryFetch,
10+
QuerysetCountWrapper,
11+
)
12+
from testapp.models import OnlineStore, StoreProduct, StoreProductCategory
13+
14+
15+
class QuerysetAnnotatePostgresTestCase(TransactionTestCase):
16+
def setUp(self) -> None:
17+
self.today = datetime.now(tz=timezone.utc)
18+
self.store = baker.make(OnlineStore, expired_on=self.today)
19+
self.store = OnlineStore.objects.get(
20+
id=self.store.id
21+
) # force refresh from db so that types are the default
22+
self.category1 = baker.make(StoreProductCategory, store=self.store)
23+
self.category2 = baker.make(StoreProductCategory, store=self.store)
24+
self.product_1 = baker.make(StoreProduct, store=self.store, selling_price=50.22)
25+
self.product_2 = baker.make(
26+
StoreProduct,
27+
store=self.store,
28+
category=self.category1,
29+
selling_price=100.33,
30+
)
31+
self.product_3 = baker.make(
32+
StoreProduct, store=self.store, category=self.category1, selling_price=75.50
33+
)
34+
self.product_4 = baker.make(
35+
StoreProduct,
36+
store=self.store,
37+
category=self.category2,
38+
selling_price=120.75,
39+
)
40+
41+
def test_simple_annotate_with_constant(self):
42+
"""Test annotating with a constant value"""
43+
queryset = StoreProduct.objects.filter().annotate(test_val=Value(10))
44+
45+
with self.assertNumQueries(1):
46+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
47+
48+
self.assertEqual(len(results), 1)
49+
products = results[0]
50+
regular_products = list(queryset)
51+
52+
self.assertEqual(len(products), len(regular_products))
53+
for product, regular_product in zip(products, regular_products):
54+
self.assertEqual(product.test_val, regular_product.test_val)
55+
self.assertEqual(product.test_val, 10)
56+
57+
def test_annotate_with_count(self):
58+
"""Test annotating with Count"""
59+
queryset = StoreProductCategory.objects.filter().annotate(
60+
product_count=Count("storeproduct")
61+
)
62+
63+
with self.assertNumQueries(1):
64+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
65+
66+
self.assertEqual(len(results), 1)
67+
categories = results[0]
68+
regular_categories = list(queryset)
69+
70+
self.assertEqual(len(categories), len(regular_categories))
71+
for category, regular_category in zip(categories, regular_categories):
72+
self.assertEqual(category.product_count, regular_category.product_count)
73+
74+
category_counts = {
75+
category.id: category.product_count for category in categories
76+
}
77+
self.assertEqual(category_counts[self.category1.id], 2) # Two products
78+
self.assertEqual(category_counts[self.category2.id], 1) # One product
79+
80+
def test_annotate_with_f_expression(self):
81+
"""Test annotating with F expression"""
82+
queryset = StoreProduct.objects.filter().annotate(
83+
doubled_price=F("selling_price") * 2
84+
)
85+
86+
with self.assertNumQueries(1):
87+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
88+
89+
self.assertEqual(len(results), 1)
90+
products = results[0]
91+
regular_products = list(queryset)
92+
93+
self.assertEqual(len(products), len(regular_products))
94+
for product, regular_product in zip(products, regular_products):
95+
self.assertAlmostEqual(
96+
float(product.doubled_price),
97+
float(regular_product.doubled_price),
98+
places=2,
99+
)
100+
self.assertAlmostEqual(
101+
float(product.doubled_price), float(product.selling_price * 2), places=2
102+
)
103+
104+
def test_multiple_annotations(self):
105+
"""Test multiple annotations in a single queryset"""
106+
queryset = StoreProductCategory.objects.filter().annotate(
107+
product_count=Count("storeproduct"),
108+
test_val=Value(5, output_field=IntegerField()),
109+
)
110+
111+
with self.assertNumQueries(1):
112+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
113+
114+
self.assertEqual(len(results), 1)
115+
categories = results[0]
116+
regular_categories = list(queryset)
117+
118+
self.assertEqual(len(categories), len(regular_categories))
119+
for category, regular_category in zip(categories, regular_categories):
120+
self.assertEqual(category.product_count, regular_category.product_count)
121+
self.assertEqual(category.test_val, regular_category.test_val)
122+
self.assertEqual(category.test_val, 5)
123+
124+
def test_empty_queryset_with_annotation(self):
125+
"""Test annotating an empty queryset"""
126+
queryset = StoreProduct.objects.none().annotate(test_val=Value(10))
127+
128+
with self.assertNumQueries(0):
129+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
130+
131+
self.assertEqual(len(results), 1)
132+
self.assertEqual(results[0], [])
133+
134+
def test_aggregate_query_with_count_inside_annotate(self):
135+
"""Test aggregate query with Count inside annotate"""
136+
queryset = (
137+
StoreProductCategory.objects.filter()
138+
.annotate(product_count=Count("storeproduct"))
139+
.filter(product_count__gt=0)
140+
)
141+
142+
with self.assertNumQueries(1):
143+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
144+
145+
self.assertEqual(len(results), 1)
146+
categories = results[0]
147+
regular_categories = list(queryset)
148+
149+
self.assertEqual(len(categories), len(regular_categories))
150+
for category, regular_category in zip(categories, regular_categories):
151+
self.assertEqual(category.product_count, regular_category.product_count)
152+
153+
for category in categories:
154+
self.assertGreater(category.product_count, 0)
155+
156+
def test_mix_of_annotated_and_regular_querysets(self):
157+
"""Test mixture of annotated and regular querysets"""
158+
annotated_queryset = StoreProductCategory.objects.filter().annotate(
159+
product_count=Count("storeproduct")
160+
)
161+
regular_queryset = StoreProduct.objects.filter()
162+
163+
with self.assertNumQueries(1):
164+
results = QuerysetsSingleQueryFetch(
165+
querysets=[annotated_queryset, regular_queryset]
166+
).execute()
167+
168+
self.assertEqual(len(results), 2)
169+
categories = results[0]
170+
products = results[1]
171+
172+
regular_categories = list(annotated_queryset)
173+
regular_products = list(regular_queryset)
174+
175+
self.assertEqual(len(categories), len(regular_categories))
176+
self.assertEqual(len(products), len(regular_products))
177+
178+
for category, regular_category in zip(categories, regular_categories):
179+
self.assertEqual(category.product_count, regular_category.product_count)
180+
181+
def test_mix_with_count_wrapper_and_annotated_queryset(self):
182+
"""Test mixture of count wrapper and annotated queryset"""
183+
count_queryset = StoreProduct.objects.filter()
184+
annotated_queryset = StoreProductCategory.objects.filter().annotate(
185+
product_count=Count("storeproduct")
186+
)
187+
188+
with self.assertNumQueries(1):
189+
results = QuerysetsSingleQueryFetch(
190+
querysets=[
191+
QuerysetCountWrapper(queryset=count_queryset),
192+
annotated_queryset,
193+
]
194+
).execute()
195+
196+
self.assertEqual(len(results), 2)
197+
product_count = results[0]
198+
categories = results[1]
199+
200+
self.assertEqual(product_count, count_queryset.count())
201+
202+
regular_categories = list(annotated_queryset)
203+
self.assertEqual(len(categories), len(regular_categories))
204+
205+
for category, regular_category in zip(categories, regular_categories):
206+
self.assertEqual(category.product_count, regular_category.product_count)
207+
208+
def test_complex_annotate_with_aggregation(self):
209+
"""Test complex annotation with aggregation"""
210+
queryset = StoreProduct.objects.values("store").annotate(
211+
total_price=Sum("selling_price"), product_count=Count("id")
212+
)
213+
214+
with self.assertNumQueries(1):
215+
results = QuerysetsSingleQueryFetch(querysets=[queryset]).execute()
216+
217+
self.assertEqual(len(results), 1)
218+
aggregated_data = results[0]
219+
regular_aggregated_data = list(queryset)
220+
221+
self.assertEqual(len(aggregated_data), len(regular_aggregated_data))
222+
for item, regular_item in zip(aggregated_data, regular_aggregated_data):
223+
self.assertAlmostEqual(
224+
float(item["total_price"]), float(regular_item["total_price"]), places=2
225+
)
226+
self.assertEqual(item["product_count"], regular_item["product_count"])
227+
228+
self.assertEqual(len(aggregated_data), 1) # Only one store
229+
self.assertEqual(aggregated_data[0]["product_count"], 4) # Four products
230+
expected_total = (
231+
Decimal("50.22") + Decimal("100.33") + Decimal("75.50") + Decimal("120.75")
232+
)
233+
self.assertAlmostEqual(
234+
float(aggregated_data[0]["total_price"]), float(expected_total), places=2
235+
)

0 commit comments

Comments
 (0)