Skip to content

[SPARK-51272][CORE] Aborting instead of re-submitting of partially completed indeterminate result stage #50630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Apr 17, 2025

What changes were proposed in this pull request?

This PR aborts the indeterminate partially completed result stage instead of resubmitting it.

Why are the changes needed?

A result stage compared to shuffle map stage has more output and more intermediate state:

  • It can use a FileOutputCommitter where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content)
  • In case of JDBC write it can already inserted all rows of a partitions into the target schema.

As long as rollback of a result stage is not supported (https://issues.apache.org/jira/browse/SPARK-25342) the best we can is abort the stage.

The existing code before this PR already tried to address a similar situation at the handling of FetchFailed when the fetch is coming from an indeterminate shuffle map stage: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2178-L2182

But this is not enough as a FetchFailed from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

Moreover the FetchFailed can be in race with a successful CompletionEvent. This is why this PR detects the partial execution at the re-submit of the indeterminate result stage.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit tests are created to illustrate the situation above.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Apr 17, 2025
if (eventQueue.nonEmpty) {
post(eventQueue.remove(0))
}
// `DAGSchedulerEventProcessLoop` is guaranteed to process events sequentially in the main test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is modified as I have seen the following in the unit-tests.log without it (focus on the thread names: pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite, dag-scheduler-message):

25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1662
25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from ResultStage 2 (DAGSchedulerSuiteRDD 2) (first 15 tasks are for partitions Vector(0, 1))
25/04/17 14:15:05.816 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Marking ResultStage 2 () as failed due to a fetch failure from ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123)
25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: ResultStage 2 () failed in 3 ms due to ignored
25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Resubmitting ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123) and ResultStage 2 () due to fetch failure
25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostA-exec (epoch 3)
25/04/17 14:15:05.818 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Shuffle files lost for executor: hostA-exec (epoch 3)
25/04/17 14:15:06.023 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Resubmitting failed stages
25/04/17 14:15:06.024 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 1 (DAGSchedulerSuiteRDD 0), which has no missing parents
25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB)
25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1825.0 B, free 2.4 GiB)
25/04/17 14:15:06.026 dag-scheduler-message INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1662
25/04/17 14:15:06.027 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 1 (DAGSchedulerSuiteRDD 0) (first 15 tasks are for partitions Vector(0))
25/04/17 14:15:06.028 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 0 (DAGSchedulerSuiteRDD 1), which has no missing parents
25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB)
25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1826.0 B, free 2.4 GiB)
25/04/17 14:15:06.030 dag-scheduler-message INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1662
25/04/17 14:15:06.030 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (DAGSchedulerSuiteRDD 1) (first 15 tasks are for partitions Vector(0, 1))
25/04/17 14:15:06.227 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 0 (RDD at DAGSchedulerSuite.scala:123) finished in 198 ms
25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: looking for newly runnable stages
25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: running: HashSet(ShuffleMapStage 1)
25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: waiting: HashSet(ResultStage 2)
25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: failed: HashSet()
25/04/17 14:15:06.233 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123) finished in 209 ms

@attilapiros
Copy link
Contributor Author

cc @mridulm

@attilapiros attilapiros requested a review from Ngone51 April 18, 2025 00:59
@attilapiros
Copy link
Contributor Author

The "java.lang.OutOfMemoryError: Java heap space" in the pyspark-pandas-connect-part2 is unrelated.

@attilapiros
Copy link
Contributor Author

After the test was restarted the error is resolved.

@mridulm
Copy link
Contributor

mridulm commented Apr 18, 2025

It can use a FileOutputCommitter where each task does a Hadoop task commit. In case of a re-submit this will lead to re-commit that Hadoop task (possibly with different content)

Only unsuccessful (and so uncommitted) tasks are candidates for (re)execution (and so commit) - not completed tasks.
So if a partition has completed task commit, it wont be reexecuted - spark ensures this w.r.t use of FileOutputCommitter

In case of JDBC write it can already inserted all rows of a partitions into the target schema.

As discussed here, this is a bug in jdbc implementation - the txn commit should be done in a task commit, not as part of foreachPartition(savePartition).
It is not expected to work correctly in all scenarios, and in the case observed, it did end up failing.

But this is not enough as a FetchFailed from a determinate stage can lead to an executor loss and a re-compute of the indeterminate parent of the result stage as shown in the attached unittest.

The fix for this is to handle something similar to this.

I have sketched a rough impl here for reference (it is just illustrative ! and to convey what I was talking about).
Assuming I am not missing anything, it should help you with fixing this issue.

  • Option 1 handles indeterminate impact when processing shuffle data loss.
  • Option 2 does this when computing an indeterminate stage.

Option 1 is much more aggressive with cleanup, but might spuriously kills jobs a lot more than required.
If option 2 is correct (and we would need to rigorously analyze it for correctness and completeness), I would prefer that - as it is much more conservative with abort'ing stages/failing jobs.

(I have adapted the tests you included in this PR for both - and they both pass)

@attilapiros
Copy link
Contributor Author

@mridulm

Only unsuccessful (and so uncommitted) tasks are candidates for (re)execution (and so commit) - not completed tasks.
So if a partition has completed task commit, it wont be reexecuted - spark ensures this w.r.t use of FileOutputCommitter

But that's also bad for an indeterminate stage as the data is inconsistent. I mean the committed partitions are coming from a previous old computation and not from the latest one but the resubmitted ones are coming from the new one.

To illustrate it:

scala> import org.apache.spark.sql.functions.udf
scala> val myudf = udf(() => { val rnd = new java.util.Random(); rnd.nextInt(10)}).asNondeterministic()
scala> spark.udf.register("myudf", myudf)
scala> val df = sql("SELECT rand, count(rand) as cnt from (SELECT myudf() as rand from explode(sequence(1, 1000))) GROUP BY rand")
scala> df.show
+----+---+
|rand|cnt|
+----+---+
|   1|122|
|   6|110|
|   3|111|
|   5| 85|
|   9| 99|
|   4| 94|
|   8| 93|
|   7| 88|
|   2| 98|
|   0|100|
+----+---+
scala> df.selectExpr("sum(cnt)").show
+--------+
|sum(cnt)|
+--------+
|    1000|
+--------+

So if we write the df to a table and some but not all tasks was successful and a resubmit happened we might have inconsistent result where sum(cnt) won't be 1000 when we load back the data as the resubmit might run on the shuffle map stage which regenerated the random values but with a different distribution of the value from 0 to 10. The complete shuffle map stage re-executed but the result stage not.

@mridulm
Copy link
Contributor

mridulm commented Apr 18, 2025

But that's also bad for an indeterminate stage as the data is inconsistent. I mean the committed partitions are coming from a previous old computation and not from the latest one but the resubmitted ones are coming from the new one.

If parent map stage was indeterminate - existing spark code would have already aborted the stage - if there was a fetch failure for that parent stage.
If parent is determinate - why does it matter which attempt the data came from ? The input to a partition for result stage will always be same (except for ordering perhaps).

As you have pointed out in the test in this PR, there is a gap in the existing impl - which is that when there is a shuffle loss due to executor/host failure (and not detected through a fetch failure) - the check for determinism was not being performed before recomputing the lost data; and so if shuffle files are lost for an indeterminate stage, but never resulted in a Fetch failure (in the test - submitMissingParents for result stage would have recomputed the indeterminate stage after it is recomputed the determinate parent) the check to abort its child stages was not being done.
This is indeed a bug, which needs to be addressed - and I have proposed two options for it. Would be great if you could take a look !

But that does not require failing the result stage - even if it is indeterminate - if no indeterminate parent has lost any shuffle outputs.

So if we write the df to a table and some but not all tasks was successful and a resubmit happened we might have inconsistent result where sum(cnt) won't be 1000 when we load back the data as the resubmit might run on the shuffle map stage which regenerated the random values but with a different distribution of the value from 0 to 10. The complete shuffle map stage re-executed but the result stage not.

This will not happen - please see above.

"some but not all tasks was successful and a resubmit happened" -> if it results in reexecution of the parent (indeterminate) stage through a fetch failure, job will be aborted.
If it does not result in re-execution of parent stage - the computation is deterministic - it is essentially: "WITH foo AS (SELECT key, count(key) as cnt FROM constant_table GROUP BY rand) SELECT SUM(cnt) FROM foo" - which will always give same result.

Please do let me know if I am missing some nuance.
If there is a test case to illustrate, that would be great !

(Edited to hopefully improve clarity !)

@mridulm
Copy link
Contributor

mridulm commented Apr 24, 2025

@mridulm regarding option 2 why a return is enough here (and not an abortStage):

It should result in same behavior (all jobs, this stage was part of, have been aborted in that scenario - and we have not added the stage to the runningStages yes).
Having said that, it is indeed better to explictly abort it - even if it does not do much right now : so that we are more robust to code evolution in future.

Why we need to check whether all jobs should be aborted and not only just one, here:

Stage can be part of multiple concurrent jobs, and not all of them might be getting aborted: some of them might not have started a result stage, and so recoverable.
Only if all of them have been aborted, we can abort the stage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants