PySpark : Finding the Index of the First Occurrence of an Element in an Array in PySpark

PySpark @ Freshers.in

This article will walk you through the steps on how to find the index of the first occurrence of an element in an array in PySpark with a working example.

Installing PySpark

Before we get started, you’ll need to have PySpark installed. You can install it via pip:

pip install pyspark

Creating the DataFrame

Let’s first create a PySpark DataFrame with an array column for demonstration purposes.

from pyspark.sql import SparkSession
from pyspark.sql.functions import array
# Initiate a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("fruits", ["apple", "banana", "cherry", "date", "elderberry"]),
        ("numbers", ["one", "two", "three", "four", "five"]),
        ("colors", ["red", "blue", "green", "yellow", "pink"])]
df = spark.createDataFrame(data, ["Category", "Items"])
df.show(20,False)
Source data
+--------+-----------------------------------------+
|Category|Items                                    |
+--------+-----------------------------------------+
|fruits  |[apple, banana, cherry, date, elderberry]|
|numbers |[one, two, three, four, five]            |
|colors  |[red, blue, green, yellow, pink]         |
+--------+-----------------------------------------+

Defining the UDF

Since PySpark doesn’t have a built-in function to find the index of an element in an array, we’ll need to create a User-Defined Function (UDF).

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# Define the UDF to find the index
def find_index(array, item):
    try:
        return array.index(item)
    except ValueError:
        return None
# Register the UDF
find_index_udf = udf(find_index, IntegerType())

This UDF takes two arguments: an array and an item. It tries to return the index of the item in the array. If the item is not found, it returns None.

Applying the UDF

To pass a literal value to the UDF, you should use the lit function from pyspark.sql.functions. Here’s how you should modify your code:

Finally, we’ll apply the UDF to our DataFrame to find the index of an element.

from pyspark.sql.functions import lit
# Use the UDF to find the index
df = df.withColumn("ItemIndex", find_index_udf(df["Items"], lit("three")))
df.show(20,False)
Final Output
+--------+-----------------------------------------+---------+
|Category|Items                                    |ItemIndex|
+--------+-----------------------------------------+---------+
|fruits  |[apple, banana, cherry, date, elderberry]|null     |
|numbers |[one, two, three, four, five]            |2        |
|colors  |[red, blue, green, yellow, pink]         |null     |
+--------+-----------------------------------------+---------+

This will add a new column to the DataFrame, “ItemIndex”, that contains the index of the first occurrence of “three” in the “Items” column. If “three” is not found in an array, the corresponding entry in the “ItemIndex” column will be null.

lit(“three”) creates a Column of literal value “three”, which is then passed to the UDF. This ensures that the UDF correctly interprets “three” as a string value, not a column name.
Author: user

Leave a Reply