PySpark : Exploring PySpark’s joinByKey on DataFrames: [combining data from two different DataFrames] – A Comprehensive Guide

PySpark @ Freshers.in

In PySpark, join operations are a fundamental technique for combining data from two different DataFrames based on a common key. While there isn’t a specific joinByKey function, PySpark provides various join functions that are applicable to DataFrames. In this article, we will explore the different types of join operations available in PySpark for DataFrames and provide a concrete example with hardcoded values instead of reading from a file.

Types of Join Operations in PySpark for DataFrames

  1. Inner join: Combines rows from both DataFrames that have matching keys.
  2. Left outer join: Retains all rows from the left DataFrame and matching rows from the right DataFrame, filling with null values when there is no match.
  3. Right outer join: Retains all rows from the right DataFrame and matching rows from the left DataFrame, filling with null values when there is no match.
  4. Full outer join: Retains all rows from both DataFrames, filling with null values when there is no match.

Inner join using DataFrames

Suppose we have two datasets, one containing sales data for a chain of stores, and the other containing store information. The sales data includes store ID, product ID, and the number of units sold, while the store information includes store ID and store location. Our goal is to combine these datasets based on store ID.

#Exploring PySpark's joinByKey on DataFrames: A Comprehensive Guide @ Freshers.in 
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize the Spark session
spark = SparkSession.builder.appName("join example @ Freshers.in ").getOrCreate()

# Sample sales data as (store_id, product_id, units_sold)
sales_data = [
    Row(store_id=1, product_id=6567876, units_sold=5),
    Row(store_id=2, product_id=6567876, units_sold=7),
    Row(store_id=1, product_id=102, units_sold=3),
    Row(store_id=2, product_id=9878767, units_sold=10),
    Row(store_id=3, product_id=6567876, units_sold=4),
    Row(store_id=3, product_id=5565455, units_sold=6),
    Row(store_id=4, product_id=9878767, units_sold=6),
    Row(store_id=4, product_id=5565455, units_sold=6),
    Row(store_id=4, product_id=9878767, units_sold=6),
    Row(store_id=5, product_id=5565455, units_sold=6),
]

# Sample store information as (store_id, store_location)
store_info = [
    Row(store_id=1, store_location="New York"),
    Row(store_id=2, store_location="Los Angeles"),
    Row(store_id=3, store_location="Chicago"),
    Row(store_id=1, store_location="Maryland"),
    Row(store_id=2, store_location="Texas")
]

# Create DataFrames from the sample data
sales_df = spark.createDataFrame(sales_data)
store_info_df = spark.createDataFrame(store_info)

# Perform the join operation
joined_df = sales_df.join(store_info_df, on="store_id", how="inner")

# Collect the results and print
for row in joined_df.collect():
    print(f"Store {row.store_id} ({row.store_location}) sales data: (Product {row.product_id}, Units Sold {row.units_sold})")

Output:

Store 1 (New York) sales data: (Product 6567876, Units Sold 5)
Store 1 (Maryland) sales data: (Product 6567876, Units Sold 5)
Store 1 (New York) sales data: (Product 102, Units Sold 3)
Store 1 (Maryland) sales data: (Product 102, Units Sold 3)
Store 2 (Los Angeles) sales data: (Product 6567876, Units Sold 7)
Store 2 (Texas) sales data: (Product 6567876, Units Sold 7)
Store 2 (Los Angeles) sales data: (Product 9878767, Units Sold 10)
Store 2 (Texas) sales data: (Product 9878767, Units Sold 10)
Store 3 (Chicago) sales data: (Product 6567876, Units Sold 4)
Store 3 (Chicago) sales data: (Product 5565455, Units Sold 6)
Author: user

Leave a Reply