PySpark : Understanding Broadcast Joins in PySpark with a detailed example

PySpark @ Freshers.in

In this article, we will explore broadcast joins in PySpark, which is an optimization technique used when joining a large DataFrame with a smaller DataFrame. This method reduces the data shuffling between nodes, resulting in improved performance. We will provide a detailed example using hardcoded values as input.

Prerequisites

  • Python 3.7 or higher
  • PySpark library
  • Java 8 or higher

Let’s create two PySpark DataFrames with hardcoded values:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .appName("Broadcast Join Example @ Freshers.in") \
    .getOrCreate()

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
])

orders_data = spark.createDataFrame([
    (1, 101, 1001),
    (2, 102, 1002),
    (3, 103, 1001),
    (4, 104, 1003),
    (5, 105, 1002),
], orders_schema)

products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", IntegerType(), True),
])

products_data = spark.createDataFrame([
    (1001, "Product A", 50),
    (1002, "Product B", 60),
    (1003, "Product C", 70),
], products_schema)

orders_data.show()
products_data.show()

Performing Broadcast Join

Now, let’s use the broadcast join to join the orders_data DataFrame with the products_data DataFrame:

from pyspark.sql.functions import broadcast

joined_data = orders_data.join(broadcast(products_data), on="product_id", how="inner")
joined_data.show()

In this example, we used the broadcast function from pyspark.sql.functions to indicate that the products_data DataFrame should be broadcasted to all worker nodes. This is useful when joining a small DataFrame (in this case, products_data) with a large DataFrame (in this case, orders_data). Broadcasting the smaller DataFrame reduces the amount of data shuffling and network overhead, resulting in improved performance.

It’s essential to broadcast only small DataFrames because broadcasting a large DataFrame can cause memory issues due to the replication of data across all worker nodes.

Analyzing the Join Results

The resulting joined_data DataFrame contains the following columns:

  • order_id
  • customer_id
  • product_id
  • product_name
  • price

This DataFrame provides a combined view of the orders and products, allowing for further analysis, such as calculating the total order value or finding the most popular products.

In this article, we explored broadcast joins in PySpark, an optimization technique for joining a large DataFrame with a smaller DataFrame. We provided a detailed example using hardcoded values as input to create two DataFrames and perform a broadcast join. This method can significantly improve performance by reducing data shuffling and network overhead during join operations. However, it’s crucial to use broadcast joins only with small DataFrames, as broadcasting large DataFrames can cause memory issues.

Author: user

Leave a Reply