PySpark : Understanding Joins in PySpark using DataFrame API

PySpark @ Freshers.in

Apache Spark, a fast and general-purpose cluster computing system, provides high-level APIs in various programming languages like Java, Scala, Python, and R, along with an optimized engine supporting general computation graphs. One of the many powerful functionalities that PySpark provides is the ability to perform various types of join operations on datasets.

This article will explore how to perform the following types of join operations in PySpark using the DataFrame API:

  • Inner Join
  • Left Join
  • Right Join
  • Full Outer Join
  • Left Semi Join
  • Left Anti Join
  • Joins with Multiple Conditions

To illustrate these join operations, we will use two sample data frames – ‘freshers_personal_details’ and ‘freshers_academic_details’.

Sample Data

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('JoinExample').getOrCreate()
freshers_personal_details = spark.createDataFrame([
    ('1', 'Sachin', 'New York'),
    ('2', 'Shekar', 'Bangalore'),
    ('3', 'Antony', 'Chicago'),
    ('4', 'Sharat', 'Delhi'),
    ('5', 'Vijay', 'London'),
], ['Id', 'Name', 'City'])
freshers_academic_details = spark.createDataFrame([
    ('1', 'Computer Science', 'MIT', '3.8'),
    ('2', 'Electrical Engineering', 'Stanford', '3.5'),
    ('3', 'Physics', 'Princeton', '3.9'),
    ('6', 'Mathematics', 'Harvard', '3.7'),
    ('7', 'Chemistry', 'Yale', '3.6'),
], ['Id', 'Major', 'University', 'GPA'])
We have ‘Id’ as a common column between the two data frames which we will use as a key for joining.

Inner Join

The inner join in PySpark returns rows from both data frames where key records of the first data frame match the key records of the second data frame.

inner_join_df = freshers_personal_details.join(freshers_academic_details, on=['Id'], how='inner')
inner_join_df.show()
Output
+---+------+---------+--------------------+----------+---+
| Id|  Name|     City|               Major|University|GPA|
+---+------+---------+--------------------+----------+---+
|  1|Sachin| New York|    Computer Science|       MIT|3.8|
|  2|Shekar|Bangalore|Electrical Engine...|  Stanford|3.5|
|  3|Antony|  Chicago|             Physics| Princeton|3.9|
+---+------+---------+--------------------+----------+---+

Left Join (Left Outer Join)

The left join in PySpark returns all rows from the first data frame along with the matching rows from the second data frame. If there is no match, the result is NULL on the right side.

left_join_df = freshers_personal_details.join(freshers_academic_details, on=['Id'], how='left')
left_join_df.show()
Output
+---+------+---------+--------------------+----------+----+
| Id|  Name|     City|               Major|University| GPA|
+---+------+---------+--------------------+----------+----+
|  1|Sachin| New York|    Computer Science|       MIT| 3.8|
|  2|Shekar|Bangalore|Electrical Engine...|  Stanford| 3.5|
|  3|Antony|  Chicago|             Physics| Princeton| 3.9|
|  5| Vijay|   London|                null|      null|null|
|  4|Sharat|    Delhi|                null|      null|null|
+---+------+---------+--------------------+----------+----+

Right Join (Right Outer Join)

The right join in PySpark returns all rows from the second data frame and the matching rows from the first data frame. If there is no match, the result is NULL on the left side.

right_join_df = freshers_personal_details.join(freshers_academic_details, on=['Id'], how='right')
right_join_df.show()
Output
+---+------+---------+--------------------+----------+---+
| Id|  Name|     City|               Major|University|GPA|
+---+------+---------+--------------------+----------+---+
|  1|Sachin| New York|    Computer Science|       MIT|3.8|
|  2|Shekar|Bangalore|Electrical Engine...|  Stanford|3.5|
|  7|  null|     null|           Chemistry|      Yale|3.6|
|  3|Antony|  Chicago|             Physics| Princeton|3.9|
|  6|  null|     null|         Mathematics|   Harvard|3.7|
+---+------+---------+--------------------+----------+---+

Full Outer Join

The full outer join in PySpark returns all rows from both data frames where there is a match in either of the data frames.

full_outer_join_df = freshers_personal_details.join(freshers_academic_details, on=['Id'], how='outer')
full_outer_join_df.show()
Output
+---+------+---------+--------------------+----------+----+
| Id|  Name|     City|               Major|University| GPA|
+---+------+---------+--------------------+----------+----+
|  1|Sachin| New York|    Computer Science|       MIT| 3.8|
|  2|Shekar|Bangalore|Electrical Engine...|  Stanford| 3.5|
|  3|Antony|  Chicago|             Physics| Princeton| 3.9|
|  4|Sharat|    Delhi|                null|      null|null|
|  5| Vijay|   London|                null|      null|null|
|  6|  null|     null|         Mathematics|   Harvard| 3.7|
|  7|  null|     null|           Chemistry|      Yale| 3.6|
+---+------+---------+--------------------+----------+----+

Left Semi Join

The left semi join in PySpark returns all the rows from the first data frame where there is a match in the second data frame on the key.

left_semi_join_df = freshers_personal_details.join(freshers_academic_details, on=['Id'], how='leftsemi')
left_semi_join_df.show()
+---+------+---------+
| Id|  Name|     City|
+---+------+---------+
|  1|Sachin| New York|
|  2|Shekar|Bangalore|
|  3|Antony|  Chicago|
+---+------+---------+

Left Anti Join

The left anti join in PySpark returns all the rows from the first data frame where there is no match in the second data frame on the key.
left_anti_join_df = freshers_personal_details.join(freshers_academic_details, on=['Id'], how='leftanti')
left_anti_join_df.show()
Output
+---+------+------+
| Id|  Name|  City|
+---+------+------+
|  5| Vijay|London|
|  4|Sharat| Delhi|
+---+------+------+

Joins with Multiple Conditions

In PySpark, we can also perform join operations based on multiple conditions.

freshers_additional_details = spark.createDataFrame([
    ('1', 'Sachin', 'Python'),
    ('2', 'Shekar', 'Java'),
    ('3', 'Sanjo', 'C++'),
    ('6', 'Rakesh', 'Scala'),
    ('7', 'Sorya', 'JavaScript'),
], ['Id', 'Name', 'Programming_Language'])
# Perform inner join based on multiple conditions
multi_condition_join_df = freshers_personal_details.join(
    freshers_additional_details, 
    (freshers_personal_details['Id'] == freshers_additional_details['Id']) & 
    (freshers_personal_details['Name'] == freshers_additional_details['Name']),
    how='inner'
)
multi_condition_join_df.show()
Output
+---+------+---------+---+------+--------------------+
| Id|  Name|     City| Id|  Name|Programming_Language|
+---+------+---------+---+------+--------------------+
|  1|Sachin| New York|  1|Sachin|              Python|
|  2|Shekar|Bangalore|  2|Shekar|                Java|
+---+------+---------+---+------+--------------------+
Note : When working with larger datasets, as the choice of join types and the order of operations can have a significant impact on the performance of the Spark application.
Author: user

Leave a Reply