Understanding shifts in user rankings based on their transactional behavior provides valuable insights into user trends and preferences. Utilizing the power of PySpark, we can leverage functions like RANK and LAG to perform such complex analyses. This article delves deep into this topic, providing a step-by-step guide and a sample dataset for hands-on experience.
Let’s start with a simple dataset that contains user transactions:
user_id | transaction_date | amount |
---|---|---|
101 | 2023-08-01 | 50 |
102 | 2023-08-02 | 100 |
103 | 2023-08-03 | 75 |
101 | 2023-08-04 | 90 |
102 | 2023-08-05 | 120 |
Loading the Data
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Setting up PySpark session
spark = SparkSession.builder \
.appName("User Rankings Analysis @ Freshers.in") \
.getOrCreate()
# Sample Data
data = [
Row(user_id=101, transaction_date="2023-08-01", amount=50),
Row(user_id=102, transaction_date="2023-08-02", amount=100),
Row(user_id=103, transaction_date="2023-08-03", amount=75),
Row(user_id=101, transaction_date="2023-08-04", amount=90),
Row(user_id=102, transaction_date="2023-08-05", amount=120)
]
# Convert list to DataFrame
df = spark.createDataFrame(data)
df.show()
Result
+-------+----------------+------+
|user_id|transaction_date|amount|
+-------+----------------+------+
| 101| 2023-08-01| 50|
| 102| 2023-08-02| 100|
| 103| 2023-08-03| 75|
| 101| 2023-08-04| 90|
| 102| 2023-08-05| 120|
+-------+----------------+------+
Analyzing User Rankings using RANK and LAG
Using the RANK Function
To rank users based on their transaction amounts:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
windowSpec = Window.orderBy(F.desc("amount"))
ranked_df = df.withColumn("rank", F.rank().over(windowSpec))
ranked_df.show()
+-------+----------------+------+----+
|user_id|transaction_date|amount|rank|
+-------+----------------+------+----+
| 102| 2023-08-05| 120| 1|
| 102| 2023-08-02| 100| 2|
| 101| 2023-08-04| 90| 3|
| 103| 2023-08-03| 75| 4|
| 101| 2023-08-01| 50| 5|
+-------+----------------+------+----+
Using the LAG Function
To understand how a user’s rank changes over time, the LAG
function can be very helpful. Let’s see how to find out the previous rank of a user for comparison:
windowSpecUser = Window.partitionBy("user_id").orderBy("transaction_date")
lagged_df = ranked_df.withColumn("prev_rank", F.lag("rank").over(windowSpecUser))
lagged_df.show()
Result
+-------+----------------+------+----+---------+
|user_id|transaction_date|amount|rank|prev_rank|
+-------+----------------+------+----+---------+
| 101| 2023-08-01| 50| 5| null|
| 101| 2023-08-04| 90| 3| 5|
| 102| 2023-08-02| 100| 2| null|
| 102| 2023-08-05| 120| 1| 2|
| 103| 2023-08-03| 75| 4| null|
+-------+----------------+------+----+---------+
This DataFrame now has an additional column, prev_rank, which displays the rank of the user’s previous transaction. A NULL indicates it was the user’s first transaction in the dataset.
Drawing Insights
Using the above approach, you can:
1. Identify users who have climbed or fallen in rankings.
2. Understand transaction patterns of top-performing users.
3. Forecast future trends based on historical ranking data.
Spark important urls to refer