Skip to content

Better KVS streaming support #2929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
barjin opened this issue Apr 14, 2025 · 0 comments
Open

Better KVS streaming support #2929

barjin opened this issue Apr 14, 2025 · 0 comments
Labels
feature Issues that represent new features or improvements to existing features. t-tooling Issues with this label are in the ownership of the tooling team.

Comments

@barjin
Copy link
Contributor

barjin commented Apr 14, 2025

Which package is the feature request for? If unsure which one to select, leave blank

None

Feature

While the current KVS implementation can work with Node.JS streams (e.g. in the setValue method), the interface leaves a lot to be desired.

See e.g. this FileDownload example - without KVS, the streamHandler can be neatly implemented with one pipeline:

import { pipeline } from 'node:stream/promises';
import { FileDownload } from '@crawlee/http';
import fs from 'node:fs';

function UpperCaseStream() {
    return new TransformStream({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      },
    });
  }

const crawler = new FileDownload({
    async streamHandler({ stream }) {
        await pipeline(
            stream,
            new TextDecoderStream(),
            UpperCaseStream(),
            fs.createWriteStream('output.txt')
        )
    }
});

await crawler.run(['https://example.com']);

If we want to switch filesystem writes for KVS, though, the API gets in the way:

    const crawler = new FileDownload({
        async streamHandler({ stream, getKeyValueStore }) {
+            const kvs = await getKeyValueStore();

+            const consumer = new Transform({
+               transform(chunk, _, callback) {;
+                    callback(null, chunk);
+                }
+            });

+            await Promise.all([
                pipeline(
                    stream,
                    new TextDecoderStream(),
                    UpperCaseStream(),
+                  consumer
                ),
+              kvs.setValue(
+                    'test', 
+                    consumer,
+                    { contentType: 'text/plain' }
+              )
            ]);           
        }
    });

The interleaving of async/await model and the stream interfaces simply feels unergonomic.

This is mostly because **while kvs.setValue supports streams, it accepts a stream instance as a parameter ** (instead of implementing a Writable that could be piped to or used as a pipeline step).

KVS.getValue currently doesn't seem to support streams at all.

Motivation

Simplify work with large files in the Crawlee / Apify ecosystem.

Ideal solution or implementation, and any additional constraints

Not sure. Perhaps adding methods like

KeyValueStore.getReadableStream(key: string): Readable

and

KeyValueStore.getWritableStream(key: string, options: { contentType: string }): Writable

would be sufficient.

This might require adding the same methods into the client(?) and / or SDK.

Alternative solutions or implementations

For pluggable stream-enabled setValue, see examples above.

For stream-enabled getValue (on Apify), users afaik have to use the API directly using an HTTP client.
I don't currently know any hack for streaming the FS-backed memory-storage KVS records.

Other context

No response

@barjin barjin added the feature Issues that represent new features or improvements to existing features. label Apr 14, 2025
@github-actions github-actions bot added the t-tooling Issues with this label are in the ownership of the tooling team. label Apr 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Issues that represent new features or improvements to existing features. t-tooling Issues with this label are in the ownership of the tooling team.
Projects
None yet
Development

No branches or pull requests

1 participant