Skip to content

Commit 7c40de0

Browse files
committed
Functional approach
Without additional inheritance in individual storage clients.
1 parent 38df4eb commit 7c40de0

File tree

9 files changed

+505
-409
lines changed

9 files changed

+505
-409
lines changed
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from asyncio import Lock
5+
from logging import getLogger
6+
from typing import TYPE_CHECKING, ClassVar, overload
7+
8+
from apify_client import ApifyClientAsync
9+
10+
from ._utils import hash_api_base_url_and_token
11+
from apify._configuration import Configuration
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Callable
15+
from types import TracebackType
16+
17+
from apify_client.clients import (
18+
DatasetClientAsync,
19+
DatasetCollectionClientAsync,
20+
KeyValueStoreClientAsync,
21+
KeyValueStoreCollectionClientAsync,
22+
RequestQueueClientAsync,
23+
RequestQueueCollectionClientAsync,
24+
)
25+
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
26+
27+
28+
logger = getLogger(__name__)
29+
30+
31+
@overload
32+
async def open_by_alias(
33+
*,
34+
alias: str,
35+
storage_class: type[KeyValueStore],
36+
collection_client: KeyValueStoreCollectionClientAsync,
37+
get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync],
38+
configuration: Configuration,
39+
) -> KeyValueStoreClientAsync: ...
40+
41+
42+
@overload
43+
async def open_by_alias(
44+
*,
45+
alias: str,
46+
storage_class: type[RequestQueue],
47+
collection_client: RequestQueueCollectionClientAsync,
48+
get_resource_client_by_id: Callable[[str], RequestQueueClientAsync],
49+
configuration: Configuration,
50+
) -> RequestQueueClientAsync: ...
51+
52+
53+
@overload
54+
async def open_by_alias(
55+
*,
56+
alias: str,
57+
storage_class: type[Dataset],
58+
collection_client: DatasetCollectionClientAsync,
59+
get_resource_client_by_id: Callable[[str], DatasetClientAsync],
60+
configuration: Configuration,
61+
) -> DatasetClientAsync: ...
62+
63+
64+
async def open_by_alias(
65+
*,
66+
alias: str,
67+
storage_class: type[Dataset | KeyValueStore | RequestQueue],
68+
collection_client: (
69+
KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync
70+
),
71+
get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync],
72+
configuration: Configuration,
73+
) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync:
74+
"""Open storage by alias, creating it if necessary.
75+
76+
This function resolves storage aliases to their IDs, creating new unnamed storage if needed.
77+
The alias mapping is stored in the default key-value store for persistence across Actor runs.
78+
79+
Args:
80+
alias: The alias name for the storage (e.g., '__default__', 'my-storage').
81+
storage_class: The storage class type (KeyValueStore, RequestQueue, or Dataset).
82+
collection_client: The Apify API collection client for the storage type.
83+
get_resource_client_by_id: A callable that takes a storage ID and returns the resource client.
84+
configuration: Configuration object containing API credentials and settings.
85+
86+
Returns:
87+
The storage client for the opened or created storage.
88+
89+
Raises:
90+
ValueError: If storage ID cannot be determined from API response.
91+
TypeError: If API response format is unexpected.
92+
"""
93+
async with AliasResolver(
94+
storage_type=storage_class,
95+
alias=alias,
96+
configuration=configuration,
97+
) as alias_resolver:
98+
storage_id = await alias_resolver.resolve_id()
99+
100+
if storage_id:
101+
# Check if storage with this ID exists
102+
resource_client = get_resource_client_by_id(storage_id)
103+
raw_metadata = await resource_client.get()
104+
if raw_metadata:
105+
return resource_client
106+
107+
# Create new unnamed storage and store alias mapping
108+
raw_metadata = await collection_client.get_or_create()
109+
110+
# Determine metadata class to parse the ID
111+
if isinstance(raw_metadata, dict):
112+
storage_id = raw_metadata.get('id')
113+
if not storage_id:
114+
raise ValueError('Failed to get storage ID from API response')
115+
else:
116+
raise TypeError('Unexpected API response format')
117+
118+
await alias_resolver.store_mapping(storage_id=storage_id)
119+
return get_resource_client_by_id(storage_id)
120+
121+
122+
class AliasResolver:
123+
"""Class for handling aliases.
124+
125+
The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using
126+
default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid
127+
unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on
128+
Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot.
129+
"""
130+
131+
_alias_map: ClassVar[dict[str, str]] = {}
132+
"""Map containing pre-existing alias storages and their ids. Global for all instances."""
133+
134+
_alias_init_lock: Lock | None = None
135+
"""Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances."""
136+
137+
_ALIAS_STORAGE_KEY_SEPARATOR = ','
138+
_ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING'
139+
140+
def __init__(
141+
self,
142+
storage_type: type[Dataset | KeyValueStore | RequestQueue],
143+
alias: str,
144+
configuration: Configuration,
145+
) -> None:
146+
self._storage_type = storage_type
147+
self._alias = alias
148+
self._configuration = configuration
149+
self._additional_cache_key = hash_api_base_url_and_token(configuration)
150+
151+
async def __aenter__(self) -> AliasResolver:
152+
"""Context manager to prevent race condition in alias creation."""
153+
lock = await self._get_alias_init_lock()
154+
await lock.acquire()
155+
return self
156+
157+
async def __aexit__(
158+
self,
159+
exc_type: type[BaseException] | None,
160+
exc_value: BaseException | None,
161+
exc_traceback: TracebackType | None,
162+
) -> None:
163+
lock = await self._get_alias_init_lock()
164+
lock.release()
165+
166+
@classmethod
167+
async def _get_alias_init_lock(cls) -> Lock:
168+
"""Get lock for controlling the creation of the alias storages.
169+
170+
The lock is shared for all instances of the AliasResolver class.
171+
It is created in async method to ensure that some event loop is already running.
172+
"""
173+
if cls._alias_init_lock is None:
174+
cls._alias_init_lock = Lock()
175+
return cls._alias_init_lock
176+
177+
@classmethod
178+
async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]:
179+
"""Get the aliases and storage ids mapping from the default kvs.
180+
181+
Mapping is loaded from kvs only once and is shared for all instances of the _AliasResolver class.
182+
183+
Args:
184+
configuration: Configuration object to use for accessing the default KVS.
185+
186+
Returns:
187+
Map of aliases and storage ids.
188+
"""
189+
if not cls._alias_map and Configuration.get_global_configuration().is_at_home:
190+
default_kvs_client = await cls._get_default_kvs_client(configuration)
191+
192+
record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY)
193+
194+
# get_record can return {key: ..., value: ..., content_type: ...}
195+
if isinstance(record, dict):
196+
if 'value' in record and isinstance(record['value'], dict):
197+
cls._alias_map = record['value']
198+
else:
199+
cls._alias_map = record
200+
else:
201+
cls._alias_map = dict[str, str]()
202+
203+
return cls._alias_map
204+
205+
async def resolve_id(self) -> str | None:
206+
"""Get id of the aliased storage.
207+
208+
Returns:
209+
Storage id if it exists, None otherwise.
210+
"""
211+
return (await self._get_alias_map(self._configuration)).get(self._storage_key, None)
212+
213+
async def store_mapping(self, storage_id: str) -> None:
214+
"""Add alias and related storage id to the mapping in default kvs and local in-memory mapping."""
215+
# Update in-memory mapping
216+
alias_map = await self._get_alias_map(self._configuration)
217+
alias_map[self._storage_key] = storage_id
218+
219+
if not Configuration.get_global_configuration().is_at_home:
220+
logging.getLogger(__name__).debug(
221+
'_AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.'
222+
)
223+
return
224+
225+
default_kvs_client = await self._get_default_kvs_client(self._configuration)
226+
await default_kvs_client.get()
227+
228+
try:
229+
record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY)
230+
231+
# get_record can return {key: ..., value: ..., content_type: ...}
232+
if isinstance(record, dict) and 'value' in record:
233+
record = record['value']
234+
235+
# Update or create the record with the new alias mapping
236+
if isinstance(record, dict):
237+
record[self._storage_key] = storage_id
238+
else:
239+
record = {self._storage_key: storage_id}
240+
241+
# Store the mapping back in the KVS.
242+
await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record)
243+
except Exception as exc:
244+
logger.warning(f'Error storing alias mapping for {self._alias}: {exc}')
245+
246+
@property
247+
def _storage_key(self) -> str:
248+
"""Get a unique storage key used for storing the alias in the mapping."""
249+
return self._ALIAS_STORAGE_KEY_SEPARATOR.join(
250+
[
251+
self._storage_type.__name__,
252+
self._alias,
253+
self._additional_cache_key,
254+
]
255+
)
256+
257+
@staticmethod
258+
async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync:
259+
"""Get a client for the default key-value store."""
260+
apify_client_async = ApifyClientAsync(
261+
token=configuration.token,
262+
api_url=configuration.api_base_url,
263+
max_retries=8,
264+
min_delay_between_retries_millis=500,
265+
timeout_secs=360,
266+
)
267+
268+
if not configuration.default_key_value_store_id:
269+
raise ValueError("'Configuration.default_key_value_store_id' must be set.")
270+
271+
return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id)

0 commit comments

Comments
 (0)