Learn to use broadcast variables : Advanced Data Transformation in PySpark

PySpark @ Freshers.in

PySpark script efficiently handles the transformation of country codes to their full names in a DataFrame. It begins by establishing a Spark session, crucial for any PySpark application. A dictionary mapping country codes to their respective names is converted into a DataFrame, enabling seamless integration with Spark’s DataFrame operations. The script creates a sample DataFrame, peopleDF, containing personal details like names, country codes, and gender. This DataFrame is then joined with the countryCodesDF DataFrame, aligning on the country codes. The join operation effectively replaces the country codes in peopleDF with their corresponding full names. After the join, the script selects relevant columns to structure the resulting DataFrame, which now includes the full country names. The final step involves displaying this transformed DataFrame, showcasing the integration of country names. This approach elegantly bypasses the limitations of using complex types directly in Spark operations, leveraging DataFrame joins for a more robust solution.

'''
Created on Fri Dec 15 2023
@author: freshers.in
'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName('Learning @ Freshers.in').getOrCreate()

# Dictionary of country codes and their full names
countryCodes = {"US":"United States", "UK":"United Kingdom", "AU":"Australia"}

# Converting the dictionary to a DataFrame
countryCodesDF = spark.createDataFrame(countryCodes.items(), ["countryCode", "countryName"])

# Sample data
peopleData = [("John","Doe","US","M"),
    ("Emma","Green","UK","F"),
    ("Oliver","Brown","AU","M"),
    ("Alice","Wilson","US","F")
  ]

# Columns for the DataFrame
columnNames = ["firstName","lastName","countryCode","gender"]
peopleDF = spark.createDataFrame(data = peopleData, schema = columnNames)

# Displaying the schema and data
peopleDF.printSchema()
peopleDF.show(truncate=False)

# Joining the DataFrames to transform country codes to full names
joinedDF = peopleDF.join(countryCodesDF, "countryCode").select(
    col("firstName"), col("lastName"), col("countryName"), col("gender"))

# Showing the transformed DataFrame
joinedDF.show(truncate=False)

Output

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- countryCode: string (nullable = true)
 |-- gender: string (nullable = true)

+---------+--------+-----------+------+
|firstName|lastName|countryCode|gender|
+---------+--------+-----------+------+
|John     |Doe     |US         |M     |
|Emma     |Green   |UK         |F     |
|Oliver   |Brown   |AU         |M     |
|Alice    |Wilson  |US         |F     |
+---------+--------+-----------+------+

+---------+--------+--------------+------+
|firstName|lastName|countryName   |gender|
+---------+--------+--------------+------+
|Oliver   |Brown   |Australia     |M     |
|Emma     |Green   |United Kingdom|F     |
|John     |Doe     |United States |M     |
|Alice    |Wilson  |United States |F     |
+---------+--------+--------------+------+
  1. Broadcast Variable Creation:
    • A dictionary countryCodes is created mapping country codes to their full names. A broadcast variable broadcastCountryCodes is then created from this dictionary, which allows the data to be efficiently shared across all nodes in the Spark cluster.
  2. Data Preparation:
    • Sample data peopleData is created as a list of tuples, each representing a person’s details like name, country code, and gender.
    • The columnNames list defines the schema for the DataFrame.
  3. DataFrame Creation:
    • Using createDataFrame, a DataFrame peopleDF is created from peopleData with the schema defined by columnNames.
    • The schema and the data are displayed using printSchema and show.
  4. Country Name Conversion Function:
    • The function countryName is defined to convert country codes to full names using the broadcast variable.
  5. DataFrame Transformation:
    • The original DataFrame is transformed using an RDD map transformation. It applies the countryName function to convert country codes to full names in the DataFrame. The result is stored in transformedDF.
  6. DataFrame Filtering:
    • Lastly, filteredDF is created by filtering peopleDF using the isin method. It filters the rows where the countryCode is present in the broadcast variable, demonstrating the use of broadcast variables in filtering operations.
  7. Country Codes as DataFrame: The countryCodes dictionary is converted into a DataFrame countryCodesDF. This DataFrame contains two columns: countryCode and countryName.
  8. Join Operation: The script now performs a join operation between peopleDF and countryCodesDF on the countryCode column. This join operation effectively replaces the country codes with their corresponding full names in the resulting DataFrame.
  9. Selecting Columns: After the join, a select operation is used to structure the resulting DataFrame with the desired columns: firstName, lastName, countryName, and gender.
  10. Displaying Transformed Data: The transformed data, which now includes the full country names, is displayed using the show method on the joined DataFrame joinedDF.

Spark important urls to refer

  1. Spark Examples
  2. PySpark Blogs
  3. Bigdata Blogs
  4. Spark Interview Questions
  5. Official Page
Author: user