Skip to content

refactor!: Introduce new storage clients #1107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3d461c5
First step - get rid of collections and unused methods
vdusek Mar 11, 2025
c081025
Update of dataset and its clients
vdusek Mar 14, 2025
0ff7e67
Update storage clients
vdusek Mar 20, 2025
a805d9a
Update KVS and its clients
vdusek Apr 8, 2025
32dfdf6
Dataset export methods
vdusek Apr 11, 2025
eed4c5a
inherit from Storage class and RQ init
vdusek Apr 11, 2025
e74ae8e
Add Dataset & KVS file system cliets tests
vdusek Apr 11, 2025
8df87f9
Memory storage clients and their tests
vdusek Apr 12, 2025
026cbf9
Caching of Dataset and KVS
vdusek Apr 14, 2025
8554115
Storage clients (entrypoints) and their tests
vdusek Apr 14, 2025
c051f65
Dataset and KVS tests
vdusek Apr 14, 2025
55abd88
Init of request queue and its clients
vdusek Apr 14, 2025
b833f91
Utilize pathlib and use Config in constructors
vdusek Apr 15, 2025
834713f
RQ and Apify clients (will be moved to SDK later)
vdusek Apr 17, 2025
84f9f12
Add init version of RQ and its clients
vdusek Apr 17, 2025
98dfdaf
Add tests for RQ
vdusek Apr 22, 2025
2c10b75
Update and add tests for the RQ storage clients
vdusek Apr 24, 2025
6f6910e
dataset list items
vdusek Apr 25, 2025
c4d5da8
Rm pytest mark only
vdusek Apr 25, 2025
bb1eb9b
Merge branch 'master' into memory-storage-refactor
vdusek Apr 25, 2025
fa037d1
Fix lint & type checks and a few tests
vdusek Apr 26, 2025
bb74715
Move Apify storage clients to SDK
vdusek Apr 28, 2025
1780afe
Improve caching and fix in KVS
vdusek Apr 29, 2025
d8de6c6
Merge branch 'master' into memory-storage-refactor
vdusek Apr 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions docs/deployment/code_examples/google/cloud_run_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@
import uvicorn
from litestar import Litestar, get

from crawlee import service_locator
from crawlee.crawlers import PlaywrightCrawler, PlaywrightCrawlingContext

# highlight-start
# Disable writing storage data to the file system
configuration = service_locator.get_configuration()
configuration.persist_storage = False
configuration.write_metadata = False
# highlight-end
from crawlee.storage_clients import MemoryStorageClient


@get('/')
async def main() -> str:
"""The crawler entry point that will be called when the HTTP endpoint is accessed."""
# highlight-start
# Disable writing storage data to the file system
storage_client = MemoryStorageClient()
# highlight-end

crawler = PlaywrightCrawler(
headless=True,
max_requests_per_crawl=10,
browser_type='firefox',
storage_client=storage_client,
)

@crawler.router.default_handler
Expand Down
15 changes: 7 additions & 8 deletions docs/deployment/code_examples/google/google_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@
import functions_framework
from flask import Request, Response

from crawlee import service_locator
from crawlee.crawlers import (
BeautifulSoupCrawler,
BeautifulSoupCrawlingContext,
)

# highlight-start
# Disable writing storage data to the file system
configuration = service_locator.get_configuration()
configuration.persist_storage = False
configuration.write_metadata = False
# highlight-end
from crawlee.storage_clients import MemoryStorageClient


async def main() -> str:
# highlight-start
# Disable writing storage data to the file system
storage_client = MemoryStorageClient()
# highlight-end

crawler = BeautifulSoupCrawler(
storage_client=storage_client,
max_request_retries=1,
request_handler_timeout=timedelta(seconds=30),
max_requests_per_crawl=10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
await crawler.run(['https://crawlee.dev'])

# Export the entire dataset to a CSV file.
await crawler.export_data_csv(path='results.csv')
await crawler.export_data(path='results.csv')


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
await crawler.run(['https://crawlee.dev'])

# Export the entire dataset to a JSON file.
await crawler.export_data_json(path='results.json')
await crawler.export_data(path='results.json')


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/code_examples/parsel_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def some_hook(context: BasicCrawlingContext) -> None:
await crawler.run(['https://github.com'])

# Export the entire dataset to a JSON file.
await crawler.export_data_json(path='results.json')
await crawler.export_data(path='results.json')


if __name__ == '__main__':
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion docs/guides/code_examples/storages/rq_basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async def main() -> None:
await request_queue.add_request('https://apify.com/')

# Add multiple requests as a batch.
await request_queue.add_requests_batched(
await request_queue.add_requests(
['https://crawlee.dev/', 'https://crawlee.dev/python/']
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ async def main() -> None:
request_queue = await RequestQueue.open(name='my-request-queue')

# Interact with the request queue directly, e.g. add a batch of requests.
await request_queue.add_requests_batched(
['https://apify.com/', 'https://crawlee.dev/']
)
await request_queue.add_requests(['https://apify.com/', 'https://crawlee.dev/'])

# Create a new crawler (it can be any subclass of BasicCrawler) and pass the request
# list as request manager to it. It will be managed by the crawler.
Expand Down
4 changes: 2 additions & 2 deletions docs/guides/request_loaders.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ class BaseStorage {

class RequestLoader {
<<abstract>>
+ handled_count
+ total_count
+ fetch_next_request()
+ mark_request_as_handled()
+ is_empty()
+ is_finished()
+ get_handled_count()
+ get_total_count()
+ to_tandem()
}

Expand Down
7 changes: 0 additions & 7 deletions docs/guides/storages.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import KvsWithCrawlerExample from '!!raw-loader!roa-loader!./code_examples/stora
import KvsWithCrawlerExplicitExample from '!!raw-loader!roa-loader!./code_examples/storages/kvs_with_crawler_explicit_example.py';

import CleaningDoNotPurgeExample from '!!raw-loader!roa-loader!./code_examples/storages/cleaning_do_not_purge_example.py';
import CleaningPurgeExplicitlyExample from '!!raw-loader!roa-loader!./code_examples/storages/cleaning_purge_explicitly_example.py';

Crawlee offers multiple storage types for managing and persisting your crawling data. Request-oriented storages, such as the <ApiLink to="class/RequestQueue">`RequestQueue`</ApiLink>, help you store and deduplicate URLs, while result-oriented storages, like <ApiLink to="class/Dataset">`Dataset`</ApiLink> and <ApiLink to="class/KeyValueStore">`KeyValueStore`</ApiLink>, focus on storing and retrieving scraping results. This guide helps you choose the storage type that suits your needs.

Expand Down Expand Up @@ -210,12 +209,6 @@ Default storages are purged before the crawler starts, unless explicitly configu

If you do not explicitly interact with storages in your code, the purging will occur automatically when the <ApiLink to="class/BasicCrawler#run">`BasicCrawler.run`</ApiLink> method is invoked.

If you need to purge storages earlier, you can call <ApiLink to="class/MemoryStorageClient#purge_on_start">`MemoryStorageClient.purge_on_start`</ApiLink> directly if you are using the default storage client. This method triggers the purging process for the underlying storage implementation you are currently using.

<RunnableCodeBlock className="language-python" language="python">
{CleaningPurgeExplicitlyExample}
</RunnableCodeBlock>

## Conclusion

This guide introduced you to the different storage types available in Crawlee and how to interact with them. You learned how to manage requests and store and retrieve scraping results using the `RequestQueue`, `Dataset`, and `KeyValueStore`. You also discovered how to use helper functions to simplify interactions with these storages. Finally, you learned how to clean up storages before starting a crawler run and how to purge them explicitly. If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/crawlee-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU). Happy scraping!
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ crawlee = "crawlee._cli:cli"

[dependency-groups]
dev = [
"apify_client", # For e2e tests.
"build~=1.2.2", # For e2e tests.
"apify-client", # For e2e tests.
"build~=1.2.2", # For e2e tests.
"mypy~=1.15.0",
"pre-commit~=4.2.0",
"proxy-py~=2.4.0",
Expand Down Expand Up @@ -144,7 +144,6 @@ ignore = [
"PLR0911", # Too many return statements
"PLR0913", # Too many arguments in function definition
"PLR0915", # Too many statements
"PTH", # flake8-use-pathlib
"PYI034", # `__aenter__` methods in classes like `{name}` usually return `self` at runtime
"PYI036", # The second argument in `__aexit__` should be annotated with `object` or `BaseException | None`
"S102", # Use of `exec` detected
Expand All @@ -166,6 +165,7 @@ indent-style = "space"
"F401", # Unused imports
]
"**/{tests}/*" = [
"ASYNC230", # Async functions should not open files with blocking methods like `open`
"D", # Everything from the pydocstyle
"INP001", # File {filename} is part of an implicit namespace package, add an __init__.py
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
Expand Down
2 changes: 1 addition & 1 deletion src/crawlee/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
cli = typer.Typer(no_args_is_help=True)

template_directory = importlib.resources.files('crawlee') / 'project_template'
with open(str(template_directory / 'cookiecutter.json')) as f:
with (template_directory / 'cookiecutter.json').open() as f:
cookiecutter_json = json.load(f)

crawler_choices = cookiecutter_json['crawler_type']
Expand Down
14 changes: 3 additions & 11 deletions src/crawlee/_service_locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from crawlee._utils.docs import docs_group
from crawlee.configuration import Configuration
from crawlee.errors import ServiceConflictError
from crawlee.events import EventManager
from crawlee.storage_clients import StorageClient
from crawlee.events import EventManager, LocalEventManager
from crawlee.storage_clients import FileSystemStorageClient, StorageClient


@docs_group('Classes')
Expand Down Expand Up @@ -49,8 +49,6 @@ def set_configuration(self, configuration: Configuration) -> None:
def get_event_manager(self) -> EventManager:
"""Get the event manager."""
if self._event_manager is None:
from crawlee.events import LocalEventManager

self._event_manager = (
LocalEventManager().from_config(config=self._configuration)
if self._configuration
Expand All @@ -77,13 +75,7 @@ def set_event_manager(self, event_manager: EventManager) -> None:
def get_storage_client(self) -> StorageClient:
"""Get the storage client."""
if self._storage_client is None:
from crawlee.storage_clients import MemoryStorageClient

self._storage_client = (
MemoryStorageClient.from_config(config=self._configuration)
if self._configuration
else MemoryStorageClient.from_config()
)
self._storage_client = FileSystemStorageClient()

self._storage_client_was_retrieved = True
return self._storage_client
Expand Down
6 changes: 1 addition & 5 deletions src/crawlee/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from crawlee.sessions import Session
from crawlee.storage_clients.models import DatasetItemsListPage
from crawlee.storages import KeyValueStore
from crawlee.storages._dataset import ExportToKwargs, GetDataKwargs
from crawlee.storages._types import ExportToKwargs, GetDataKwargs

# Workaround for https://github.com/pydantic/pydantic/issues/9445
J = TypeVar('J', bound='JsonSerializable')
Expand Down Expand Up @@ -275,10 +275,6 @@ async def push_data(
**kwargs: Unpack[PushDataKwargs],
) -> None:
"""Track a call to the `push_data` context helper."""
from crawlee.storages._dataset import Dataset

await Dataset.check_and_serialize(data)

self.push_data_calls.append(
PushDataFunctionCall(
data=data,
Expand Down
77 changes: 62 additions & 15 deletions src/crawlee/_utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@

import asyncio
import contextlib
import io
import csv
import json
import mimetypes
import os
import re
import shutil
from enum import Enum
from logging import getLogger
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
from typing import Any, TextIO

from typing_extensions import Unpack

from crawlee.storages._types import ExportDataCsvKwargs, ExportDataJsonKwargs

logger = getLogger(__name__)


class ContentType(Enum):
Expand Down Expand Up @@ -83,28 +91,67 @@ def determine_file_extension(content_type: str) -> str | None:
return ext[1:] if ext is not None else ext


def is_file_or_bytes(value: Any) -> bool:
"""Determine if the input value is a file-like object or bytes.

This function checks whether the provided value is an instance of bytes, bytearray, or io.IOBase (file-like).
The method is simplified for common use cases and may not cover all edge cases.
async def json_dumps(obj: Any) -> str:
"""Serialize an object to a JSON-formatted string with specific settings.

Args:
value: The value to be checked.
obj: The object to serialize.

Returns:
True if the value is either a file-like object or bytes, False otherwise.
A string containing the JSON representation of the input object.
"""
return isinstance(value, (bytes, bytearray, io.IOBase))
return await asyncio.to_thread(json.dumps, obj, ensure_ascii=False, indent=2, default=str)


async def json_dumps(obj: Any) -> str:
"""Serialize an object to a JSON-formatted string with specific settings.
def infer_mime_type(value: Any) -> str:
"""Infer the MIME content type from the value.

Args:
obj: The object to serialize.
value: The value to infer the content type from.

Returns:
A string containing the JSON representation of the input object.
The inferred MIME content type.
"""
return await asyncio.to_thread(json.dumps, obj, ensure_ascii=False, indent=2, default=str)
# If the value is bytes (or bytearray), return binary content type.
if isinstance(value, (bytes, bytearray)):
return 'application/octet-stream'

# If the value is a dict or list, assume JSON.
if isinstance(value, (dict, list)):
return 'application/json; charset=utf-8'

# If the value is a string, assume plain text.
if isinstance(value, str):
return 'text/plain; charset=utf-8'

# Default fallback.
return 'application/octet-stream'


async def export_json_to_stream(
iterator: AsyncIterator[dict],
dst: TextIO,
**kwargs: Unpack[ExportDataJsonKwargs],
) -> None:
items = [item async for item in iterator]
json.dump(items, dst, **kwargs)


async def export_csv_to_stream(
iterator: AsyncIterator[dict],
dst: TextIO,
**kwargs: Unpack[ExportDataCsvKwargs],
) -> None:
writer = csv.writer(dst, **kwargs)
write_header = True

# Iterate over the dataset and write to CSV.
async for item in iterator:
if not item:
continue

if write_header:
writer.writerow(item.keys())
write_header = False

writer.writerow(item.values())
18 changes: 2 additions & 16 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,7 @@ class Configuration(BaseSettings):
)
),
] = True
"""Whether to purge the storage on the start. This option is utilized by the `MemoryStorageClient`."""

write_metadata: Annotated[bool, Field(alias='crawlee_write_metadata')] = True
"""Whether to write the storage metadata. This option is utilized by the `MemoryStorageClient`."""

persist_storage: Annotated[
bool,
Field(
validation_alias=AliasChoices(
'apify_persist_storage',
'crawlee_persist_storage',
)
),
] = True
"""Whether to persist the storage. This option is utilized by the `MemoryStorageClient`."""
"""Whether to purge the storage on the start. This option is utilized by the storage clients."""

persist_state_interval: Annotated[
timedelta_ms,
Expand Down Expand Up @@ -239,7 +225,7 @@ class Configuration(BaseSettings):
),
),
] = './storage'
"""The path to the storage directory. This option is utilized by the `MemoryStorageClient`."""
"""The path to the storage directory. This option is utilized by the storage clients."""

headless: Annotated[
bool,
Expand Down
Loading
Loading