Analyzing User rankings over time using PySpark's RANK and LAG Functions

Understanding shifts in user rankings based on their transactional behavior provides valuable insights into user trends and preferences. Utilizing the power of PySpark, we can leverage functions like RANK and LAG to perform such complex analyses. This article delves deep into this topic, providing a step-by-step guide and a sample dataset for hands-on experience. Let’s start with a simple dataset that contains user transactions:


user_id	transaction_date	amount
101	2023-08-01	50
102	2023-08-02	100
103	2023-08-03	75
101	2023-08-04	90
102	2023-08-05	120
Loading the Data

from pyspark.sql import SparkSession
from pyspark.sql import Row
# Setting up PySpark session
spark = SparkSession.builder \
    .appName("User Rankings Analysis @ Freshers.in") \
    .getOrCreate()
# Sample Data
data = [
    Row(user_id=101, transaction_date="2023-08-01", amount=50),
    Row(user_id=102, transaction_date="2023-08-02", amount=100),
    Row(user_id=103, transaction_date="2023-08-03", amount=75),
    Row(user_id=101, transaction_date="2023-08-04", amount=90),
    Row(user_id=102, transaction_date="2023-08-05", amount=120)
]
# Convert list to DataFrame
df = spark.createDataFrame(data)
df.show()

Result

+-------+----------------+------+
|user_id|transaction_date|amount|
+-------+----------------+------+
|    101|      2023-08-01|    50|
|    102|      2023-08-02|   100|
|    103|      2023-08-03|    75|
|    101|      2023-08-04|    90|
|    102|      2023-08-05|   120|
+-------+----------------+------+
Analyzing User Rankings using RANK and LAG Using the RANK Function To rank users based on their transaction amounts:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
windowSpec = Window.orderBy(F.desc("amount"))
ranked_df = df.withColumn("rank", F.rank().over(windowSpec))
ranked_df.show()

+-------+----------------+------+----+
|user_id|transaction_date|amount|rank|
+-------+----------------+------+----+
|    102|      2023-08-05|   120|   1|
|    102|      2023-08-02|   100|   2|
|    101|      2023-08-04|    90|   3|
|    103|      2023-08-03|    75|   4|
|    101|      2023-08-01|    50|   5|
+-------+----------------+------+----+
Using the LAG Function To understand how a user’s rank changes over time, the LAG function can be very helpful. Let’s see how to find out the previous rank of a user for comparison:

windowSpecUser = Window.partitionBy("user_id").orderBy("transaction_date")
lagged_df = ranked_df.withColumn("prev_rank", F.lag("rank").over(windowSpecUser))
lagged_df.show()
Result

+-------+----------------+------+----+---------+
|user_id|transaction_date|amount|rank|prev_rank|
+-------+----------------+------+----+---------+
|    101|      2023-08-01|    50|   5|     null|
|    101|      2023-08-04|    90|   3|        5|
|    102|      2023-08-02|   100|   2|     null|
|    102|      2023-08-05|   120|   1|        2|
|    103|      2023-08-03|    75|   4|     null|
+-------+----------------+------+----+---------+
This DataFrame now has an additional column, prev_rank, which displays the rank of the user’s previous transaction. A NULL indicates it was the user’s first transaction in the dataset. Drawing Insights Using the above approach, you can: 1. Identify users who have climbed or fallen in rankings. 2. Understand transaction patterns of top-performing users. 3. Forecast future trends based on historical ranking data.