Skip to content

[SPARK-51823][SS] Add config to not persist state store on executors #50612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Kimahriman
Copy link
Contributor

@Kimahriman Kimahriman commented Apr 17, 2025

What changes were proposed in this pull request?

Adds a new state store config unloadOnCommit that unloads the state store instance from the executor at task completion. This frees up resources on the executor and prevents potentially unbounded resource usage from continually adding more state store instances to a single executor.

A task completion listener will execute a synchronous maintenance followed by a close on the state store. Since we do the maintenance synchronously, we never need to start the background maintenance thread.

Why are the changes needed?

Stateful streams can have trouble scaling to large volumes of data without also increasing the total resources allocated to the application. By unloading state stores on task completion, stateful streams are able to complete with fewer resources, at the cost of slightly higher latency per batch in certain scenarios.

Does this PR introduce any user-facing change?

Yes, adds a new config for changing the behavior of stateful streams.

How was this patch tested?

New UT is added to show the config takes effect. I'm not sure what all corner cases may need to be tested with this.

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kimahriman Thanks for submitting this change. I just took a quick look. Please can you share more on the motivation for this and your use case? I would like to understand the issue you observed, the type of stateful query you ran, the state store provider you used and your cluster setup.

@Kimahriman
Copy link
Contributor Author

@Kimahriman Thanks for submitting this change. I just took a quick look. Please can you share more on the motivation for this and your use case? I would like to understand the issue you observed, the type of stateful query you ran, the state store provider you used and your cluster setup.

There's a little more information in the jira issue. The quick background is we do relatively large streaming deduplications and streaming aggregations (total state size can be in the 10s to 100s of TiB) with up to 10s of thousands of partitions. We've been dealing with issues related to this for a long time, and over time some fixes come out to make this situation better, but at the end of the day they are mostly band-aids to this type of scenario. We use the RocksDB state store for most things, and use bounded memory to limit resource utilization.

This is the result of finally digging into why some of our partitions were taking over an hour to create a RocksDB snapshot to upload. This led us to find a lot of things potentially contributing to this:

  • The level-0 cache is pinned for all opened RocksDB instances on an executor. This can easily be several hundred on a single executor, and all that memory can't be freed even when those instances are not being used. This could be fixed by not pinning the level-0 cache
  • There seemed to be contention for background compaction, as we would see the checkpoint process start, and then nothing happen for that partition for an hour, and then finally compaction kick in and the checkpoint successfully created. This could be improved by increasing background threads

But at the end of the day these are all workarounds to the problem that the existing stateful streaming approach doesn't work well with high-latency, high-volume queries, it's more designed around low-latency, low-volume queries. Additionally, we use a dynamic allocation setup, so it is very likely most of our executors will be deallocated before the next batch runs, so keeping the state stores open does nothing but waste resources.

This change would also help the HDFSBackedStateStore have more use cases again and help some people avoid the added complexity of using RocksDB just to deal with all the state stores being kept on a small number of executors.

@anishshri-db
Copy link
Contributor

@Kimahriman - if you are removing/adding executors per batch, then locality probably is not very useful. But I'm curious about the perf diff you see with large state (especially as the large state grows) - I guess it might not matter a whole lot - because even today - you are doing a fresh pull for each batch ?

@Kimahriman
Copy link
Contributor Author

Kimahriman commented Apr 17, 2025

@Kimahriman - if you are removing/adding executors per batch, then locality probably is not very useful.

Yeah this includes not even reporting to the coordinator being active since that's just used for locality.

But I'm curious about the perf diff you see with large state (especially as the large state grows) - I guess it might not matter a whole lot - because even today - you are doing a fresh pull for each batch ?

Yeah generally for us there's no performance drop since many of our executors will end up get deallocated between batches anyway, so we have to redownload the state each batch regardless. The long pole in the tent for us is generally the time it takes to create and upload a checkpoint. This is partially due to issues where a checkpoint is generally created every batch for RocksDB even with the changelog enabled, because of the hard coded 10k row check as well as not initializing the latest snapshot version on a fresh load (both of which appear to be fixed for the 4.0 release)

Also I will probably try to change back to the HDFSBackedStateStore for some of our jobs and see how it goes since this effectively removes the requirement that "all state must fit in memory at once"

@anishshri-db
Copy link
Contributor

@Kimahriman - yea and with 4.0, the only additional cost for doMaintenance would be around uploading the snapshot and deleting old versions. Both likely won't be expensive on a per batch basis (mostly a no-op for most batches) but you could see some spikes in latency when both these operations are performed. (I guess thats acceptable in your case rather than keeping the resources running)

@Kimahriman
Copy link
Contributor Author

@Kimahriman - yea and with 4.0, the only additional cost for doMaintenance would be around uploading the snapshot and deleting old versions. Both likely won't be expensive on a per batch basis (mostly a no-op for most batches) but you could see some spikes in latency when both these operations are performed. (I guess thats acceptable in your case rather than keeping the resources running)

Yeah I thought about trying to use the background maintenance to clean up the state but that just seemed hacky and race condition prone, and I'm already saying I care less about latency in this mode

buildConf("spark.sql.streaming.stateStore.unloadOnCommit")
.internal()
.doc("When true, Spark will synchronously run maintenance and then close each StateStore " +
"instance on task completion. This reduce overhead involved in keeping every StateStore " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reduces overhead in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated a little more with the doc

@@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("SPARK-XXXXX: unload on commit") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the actual SPARK ticket number here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops yeah fixed

@@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("SPARK-51823: unload on commit") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an integration test under RocksDBStateStoreIntegrationSuite with the config enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a basic integration test, let me know if there's anything you want to add to it

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending comment


// StateStore should be unloaded, so its tmp dir shouldn't exist
for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
assert(!file.getName().startsWith("StateStore"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof this only works if I run this test by itself. When running the whole Suite, all the other tests leave StateStore data on disk so this check sees those

Copy link
Contributor Author

@Kimahriman Kimahriman Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by adding an afterEach to clear the state store in this suite. The parent StreamTest only does an afterAll to clear the state store, not sure if that one should just be updated to afterEach instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still doesn't work, too many other suites not cleaning up after themselves, need to think about how else to verify

@Kimahriman Kimahriman force-pushed the state-store-unload-on-commit branch from c0f68db to f86bd3f Compare April 20, 2025 00:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants