Optimizing PySpark queries with adaptive query execution – (AQE) – Example included

PySpark @ Freshers.in

Spark 3+ brought numerous enhancements and features, and one of the notable ones is Adaptive Query Execution (AQE). AQE is a pivotal feature designed to optimize Spark SQL queries in real-time based on runtime statistics. This article explores the facets of AQE, focusing on its capabilities and illustrating its application through examples. Adaptive Query Execution in Spark 3+ is a revolutionary feature aimed at optimizing Spark SQL queries dynamically based on runtime statistics, making it essential for those seeking to enhance their Spark applications’ performance and efficiency. By understanding the intricacies of AQE and implementing it correctly, developers and data engineers can ensure optimal resource utilization and efficient query execution, elevating the overall Spark experience.

Understanding Adaptive Query Execution (AQE):

AQE optimizes query plans based on runtime statistics as opposed to using static statistics collected before execution. This adaptive approach allows for adjusting physical query plans based on the actual intermediate output, addressing challenges posed by inaccurate static statistics.

Key Features of AQE:

  1. Coalesce Shuffle Partitions: Dynamically coalesces post-shuffle partitions based on the actual data size, reducing the number of output partitions and consequently optimizing the subsequent stage.
  2. Optimize Skew Joins: Identifies and handles skewed join keys by dividing skewed keys into smaller tasks, thereby avoiding resource overuse on skewed tasks.
  3. Converting Sort-Merge Join to Broadcast Join: If one side of the join has much smaller output at runtime, AQE can dynamically switch to broadcast join, reducing computation time.

Example:

Let’s go through a simple example to understand how AQE can optimize Spark SQL queries. Consider we have two DataFrames, orders and customers, with some dummy data:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize spark session
spark = SparkSession.builder.appName("AQE Example").enableHiveSupport().getOrCreate()
# Enable AQE - Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Create dataframes
orders_data = [("order1", "customer1", 1000), ("order2", "customer2", 2000), ("order3", "customer3", 3000)]
customers_data = [("customer1", "Rajesh"), ("customer2", "Sachin"), ("customer3", "Ambani")]
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"])
customers_df = spark.createDataFrame(customers_data, ["customer_id", "customer_name"])
# Perform join operation
result_df = orders_df.join(customers_df, "customer_id").filter(col("amount") > 1500)
result_df.show()

Output

+-----------+--------+------+-------------+
|customer_id|order_id|amount|customer_name|
+-----------+--------+------+-------------+
|  customer2|  order2|  2000|       Sachin|
|  customer3|  order3|  3000|       Ambani|
+-----------+--------+------+-------------+

In this example, we have enabled AQE by setting the spark.sql.adaptive.enabled configuration to true. We then performed a join operation between orders_df and customers_df DataFrames and filtered the results. AQE will work behind the scenes to optimize this query based on the actual intermediate data.

Inspecting AQE plan:

To see how AQE has optimized the query, you can print the executed plan of the DataFrame:

print(result_df._jdf.queryExecution().toString())
Parsed, Analyzed, Optimized and Physical Plan
== Parsed Logical Plan ==
'Filter ('amount > 1500)
+- Project [customer_id#94, order_id#93, amount#95L, customer_name#100]
   +- Join Inner, (customer_id#94 = customer_id#99)
      :- LogicalRDD [order_id#93, customer_id#94, amount#95L], false
      +- LogicalRDD [customer_id#99, customer_name#100], false

== Analyzed Logical Plan ==
customer_id: string, order_id: string, amount: bigint, customer_name: string
Filter (amount#95L > cast(1500 as bigint))
+- Project [customer_id#94, order_id#93, amount#95L, customer_name#100]
   +- Join Inner, (customer_id#94 = customer_id#99)
      :- LogicalRDD [order_id#93, customer_id#94, amount#95L], false
      +- LogicalRDD [customer_id#99, customer_name#100], false

== Optimized Logical Plan ==
Project [customer_id#94, order_id#93, amount#95L, customer_name#100]
+- Join Inner, (customer_id#94 = customer_id#99)
   :- Filter ((isnotnull(amount#95L) AND (amount#95L > 1500)) AND isnotnull(customer_id#94))
   :  +- LogicalRDD [order_id#93, customer_id#94, amount#95L], false
   +- Filter isnotnull(customer_id#99)
      +- LogicalRDD [customer_id#99, customer_name#100], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [customer_id#94, order_id#93, amount#95L, customer_name#100]
   +- SortMergeJoin [customer_id#94], [customer_id#99], Inner
      :- Sort [customer_id#94 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(customer_id#94, 200), ENSURE_REQUIREMENTS, [plan_id=527]
      :     +- Filter ((isnotnull(amount#95L) AND (amount#95L > 1500)) AND isnotnull(customer_id#94))
      :        +- Scan ExistingRDD[order_id#93,customer_id#94,amount#95L]
      +- Sort [customer_id#99 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(customer_id#99, 200), ENSURE_REQUIREMENTS, [plan_id=528]
            +- Filter isnotnull(customer_id#99)
               +- Scan ExistingRDD[customer_id#99,customer_name#100]
Spark important urls to refer
Author: user