In this article, we delve deep into Spark 3's AQE framework, focusing on the coalesce and caching mechanisms of its shuffle partitions. Using an illustrative example, we highlight AQE's impacts on caching, reveal some unexpected outcomes, and propose solutions for optimizing cache utilization in the context of AQE.
Spark AQE in action
Spark 3's Adaptive Query Execution (AQE) is a framework that improves query performance and resource utilization in Apache Spark. It has three key features:
1. Optimization of skewed partitions: AQE detects and redistributes skewed partitions during joins to ensure more balanced partitioning. This results in improved workload parallelism and reduced processing time of joins, leading to better overall performance and optimized resource utilization.
2. Dynamic change of join strategy: AQE allows for a dynamic change of the join strategy during query execution. Spark 3 selects the most suitable join algorithm based on factors such as dataset size, available memory, and data distribution.
3. Dynamically coalescing shuffle partitions: this feature improves execution efficiency by reducing computational overhead and optimizing network usage. This results in faster and more efficient query processing, allowing Spark applications to achieve optimal performance and make efficient use of cluster resources. This eliminates the need for shuffling unnecessary data across the cluster.
In this blog post, my objective is to provide an in-depth exploration of the automatic coalesce mechanism for shuffle partitions, specifically focusing on its functionality within the context of cached datasets.
Below is some sample code used to demonstrate the mechanism.
Code 1. Creation of example dataset.
The code generates a sample dataFrame, eliminates duplicates with the .dropDuplicates function, and initiates computations with the .show() action.
Upon inspecting the SparkUI:
It becomes apparent that the default 200 shuffle partitions do not enhance parallelism, as the majority of them are empty.
To observe any differences, we will enable AQE (Adaptive Query Execution).
Code 2. AQE enabled.
Our query plan should look like this:
Code 3. Physical plan with AQE enabled.
We can easily see the "runtime statistics" and the AQEShuffleRead stage that AQE has added.
We can confirm this by checking Spark UI. Currently, our partitions appear as follows:
In this scenario, the previous 2-stage setup with 2 partitions in the first stage and 200 partitions in the second stage has been replaced with a new configuration consisting of one stage with 2 partitions (similar to the first setup) and another stage with just 1 partition. AQE was crucial in this, as Spark was able to analyze the runtime statistics and determine that only 1 partition was necessary to process 2 records.
Influence of dataset caching on Adaptive Query Execution
Even in this basic example, the advantages of utilizing AQE are evident.
Now let's observe the effect of incorporating caching into our code.
Code 4. Cache added.
This time we ended up with six stages, with two of them having multiple empty partitions. This outcome was unexpected and not what we had anticipated.
Let's check the plan:
Code 5. Physical plan with AQE enabled with caching.
It appears that there are no AQE-related stages here, indicating that Spark may have overlooked the option for plan optimization through AQE. How can we prompt Spark to utilize AQE?
How to enable AQE for cached datasets?
This issue has been addressed in one of the tickets on Spark's Jira platform, and has been resolved by adding a new parameter called:
This parameter, when set to true, enables Catalyst to use AQE for cached datasets. The default value of this parameter is false.
However, there is no information available in the official Spark documentation about this parameter, so I have relied on the information found in the repository.
Code 6. Description of canChangeCachedPlanOutputPartitioning parameter from source code documentation.
Let's test its functionality in practice by adding one line to our test code:
Code 7. Parameter canChangeCachedPlanOutputPartitioning parameter set to true.
The result in SparkUI is as follows:
We have optimized the number of partitions and now only have one instead of multiple empty ones. Let's check the plan to ensure that AQE is functioning properly.
Code 8. Physical plan with AQE enabled with caching and canChangeCachedPlanOutputPartitioning set to true.
As you can see, the physical plan description for cached datasets is less detailed compared to those that are not cached. Nonetheless, the presence of the AdaptiveSparkPlan in line (9) is the crucial indication that AQE has been implemented.
Spark 3 AQE: Enhancements and Cache Configuration Tips
Spark 3's Adaptive Query Execution offers several advantages for data processing workflows. It optimizes shuffle operations, dynamically adjusts query plans, and uses advanced statistics to significantly improve query performance, reduce computational overhead, and maximize cluster resources. However, as detailed in the above post, AQE does not operate with cached datasets by default. In Spark 3.2 and higher you need to set
to true if you want to use this feature alongside caching.
Please keep in mind that this parameter was introduced in Spark 3.2, so if you are using any older version of Spark you cannot use AQE for cached datasets — in this case it is even more important to carefully consider which datasets should be stored in cache.
In Spark 3.5 this parameter is by default set to true, To restore the previous behavior, set canChangeCachedPlanOutputPartitioning to false (you can check more details in Spark 3.5 SQL migration guide)
Disclaimer: The test platform used for this example was a Databricks community cluster with version 12.2 LTS, which includes Apache Spark 3.3.2 and Scala 2.12.
Sources for Apache Spark 3 AQE: