PySpark script efficiently handles the transformation of country codes to their full names in a DataFrame. It begins by establishing a Spark session, crucial for any PySpark application. A dictionary mapping country codes to their respective names is converted into a DataFrame, enabling seamless integration with Spark’s DataFrame operations. The script creates a sample DataFrame, peopleDF
, containing personal details like names, country codes, and gender. This DataFrame is then joined with the countryCodesDF
DataFrame, aligning on the country codes. The join operation effectively replaces the country codes in peopleDF
with their corresponding full names. After the join, the script selects relevant columns to structure the resulting DataFrame, which now includes the full country names. The final step involves displaying this transformed DataFrame, showcasing the integration of country names. This approach elegantly bypasses the limitations of using complex types directly in Spark operations, leveraging DataFrame joins for a more robust solution.
'''
Created on Fri Dec 15 2023
@author: freshers.in
'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName('Learning @ Freshers.in').getOrCreate()
# Dictionary of country codes and their full names
countryCodes = {"US":"United States", "UK":"United Kingdom", "AU":"Australia"}
# Converting the dictionary to a DataFrame
countryCodesDF = spark.createDataFrame(countryCodes.items(), ["countryCode", "countryName"])
# Sample data
peopleData = [("John","Doe","US","M"),
("Emma","Green","UK","F"),
("Oliver","Brown","AU","M"),
("Alice","Wilson","US","F")
]
# Columns for the DataFrame
columnNames = ["firstName","lastName","countryCode","gender"]
peopleDF = spark.createDataFrame(data = peopleData, schema = columnNames)
# Displaying the schema and data
peopleDF.printSchema()
peopleDF.show(truncate=False)
# Joining the DataFrames to transform country codes to full names
joinedDF = peopleDF.join(countryCodesDF, "countryCode").select(
col("firstName"), col("lastName"), col("countryName"), col("gender"))
# Showing the transformed DataFrame
joinedDF.show(truncate=False)
Output
root
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- gender: string (nullable = true)
+---------+--------+-----------+------+
|firstName|lastName|countryCode|gender|
+---------+--------+-----------+------+
|John |Doe |US |M |
|Emma |Green |UK |F |
|Oliver |Brown |AU |M |
|Alice |Wilson |US |F |
+---------+--------+-----------+------+
+---------+--------+--------------+------+
|firstName|lastName|countryName |gender|
+---------+--------+--------------+------+
|Oliver |Brown |Australia |M |
|Emma |Green |United Kingdom|F |
|John |Doe |United States |M |
|Alice |Wilson |United States |F |
+---------+--------+--------------+------+
- Broadcast Variable Creation:
- A dictionary
countryCodes
is created mapping country codes to their full names. A broadcast variablebroadcastCountryCodes
is then created from this dictionary, which allows the data to be efficiently shared across all nodes in the Spark cluster.
- A dictionary
- Data Preparation:
- Sample data
peopleData
is created as a list of tuples, each representing a person’s details like name, country code, and gender. - The
columnNames
list defines the schema for the DataFrame.
- Sample data
- DataFrame Creation:
- Using
createDataFrame
, a DataFramepeopleDF
is created frompeopleData
with the schema defined bycolumnNames
. - The schema and the data are displayed using
printSchema
andshow
.
- Using
- Country Name Conversion Function:
- The function
countryName
is defined to convert country codes to full names using the broadcast variable.
- The function
- DataFrame Transformation:
- The original DataFrame is transformed using an RDD
map
transformation. It applies thecountryName
function to convert country codes to full names in the DataFrame. The result is stored intransformedDF
.
- The original DataFrame is transformed using an RDD
- DataFrame Filtering:
- Lastly,
filteredDF
is created by filteringpeopleDF
using theisin
method. It filters the rows where thecountryCode
is present in the broadcast variable, demonstrating the use of broadcast variables in filtering operations.
- Lastly,
- Country Codes as DataFrame: The
countryCodes
dictionary is converted into a DataFramecountryCodesDF
. This DataFrame contains two columns:countryCode
andcountryName
. - Join Operation: The script now performs a join operation between
peopleDF
andcountryCodesDF
on thecountryCode
column. This join operation effectively replaces the country codes with their corresponding full names in the resulting DataFrame. - Selecting Columns: After the join, a
select
operation is used to structure the resulting DataFrame with the desired columns:firstName
,lastName
,countryName
, andgender
. - Displaying Transformed Data: The transformed data, which now includes the full country names, is displayed using the
show
method on the joined DataFramejoinedDF
.
Spark important urls to refer