Concatenating two or more maps into a single map : map_concat

PySpark @ Freshers.in

The map_concat function in PySpark is designed to concatenate two or more maps into a single map. It merges key-value pairs from multiple maps, creating a comprehensive map. The function is particularly useful in scenarios where there is a need to combine data from different sources. The syntax for map_concat is as follows:

from pyspark.sql.functions import map_concat

Advantages of using map_concat

  • Data Integration: Allows for the seamless combination of data from various maps.
  • Simplicity: Provides an easy way to merge key-value pairs without complex transformations.
  • Efficiency: Enhances data processing efficiency by simplifying data structure.

Example: Merging personal details

Imagine a scenario where we have two sets of data for individuals – one containing their names and the other their professions. Our aim is to merge these datasets for a more comprehensive view.

Example

Names Map:

Name Map
{“Sachin”: “India”}
{“Ram”: “Nepal”}
{“Raju”: “Bangladesh”}
{“David”: “USA”}
{“Wilson”: “Australia”}

Profession Map:

Profession Map
{“Sachin”: “Cricketer”}
{“Ram”: “Artist”}
{“Raju”: “Engineer”}
{“David”: “Musician”}
{“Wilson”: “Doctor”}

Objective

Merge the name and profession maps into a single map for each individual.

Implementation in PySpark

Setting up the PySpark environment and creating the DataFrame:

from pyspark.sql import SparkSession
from pyspark.sql.functions import map_concat
# Initialize Spark Session
spark = SparkSession.builder.appName("MapConcat Example").getOrCreate()
# Sample Data
data = [({"Sachin": "India"}, {"Sachin": "Cricketer"}),
        ({"Ram": "Nepal"}, {"Ram": "Artist"}),
        ({"Raju": "Bangladesh"}, {"Raju": "Engineer"}),
        ({"David": "USA"}, {"David": "Musician"}),
        ({"Wilson": "Australia"}, {"Wilson": "Doctor"})]
# Creating DataFrame
df = spark.createDataFrame(data, ["Name Map", "Profession Map"])
df.show()

Set the spark.sql.mapKeyDedupPolicy to LAST_WIN

This setting will allow Spark to automatically handle duplicate keys by keeping the value from the last map where the key appears. Here’s how you can set this configuration:

If you are not using the above then there is a high probability of having error as “Caused by: org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key Sachin was found…” which indicates that your operation in PySpark is resulting in a map with duplicate keys, which is not allowed in Spark’s default configuration. This issue often arises when using functions like map_concat, where two maps are being combined. If the same key exists in both maps, Spark will throw this error.

Applying the map_concat function

spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")

# Now run your map_concat operation
combined_df = df.withColumn("Combined Map", map_concat("Name Map", "Profession Map"))

Output

This will result in a DataFrame with combined maps:

+--------------------+--------------------+--------------------+
|            Name Map|      Profession Map|        Combined Map|
+--------------------+--------------------+--------------------+
|   {Sachin -> India}|{Sachin -> Cricke...|{Sachin -> Cricke...|
|      {Ram -> Nepal}|     {Ram -> Artist}|     {Ram -> Artist}|
|{Raju -> Bangladesh}|  {Raju -> Engineer}|  {Raju -> Engineer}|
|      {David -> USA}| {David -> Musician}| {David -> Musician}|
|{Wilson -> Austra...|  {Wilson -> Doctor}|  {Wilson -> Doctor}|
+--------------------+--------------------+--------------------+

Manually Handle Duplicate Keys

Before concatenating the maps, you can manually handle the duplicate keys. This approach is more involved but gives you more control over how to handle duplicates. For instance, you might want to merge the values of the duplicate keys or choose which value to keep based on certain criteria.

Here’s a simplified approach to demonstrate this:

from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType
def merge_maps(map1, map2):
    # Merge two dictionaries and handle duplicate keys as needed
    merged_map = {**map1, **map2}  # Simple override; customize as needed
    return merged_map
merge_maps_udf = udf(merge_maps, MapType(StringType(), StringType()))
data = [({"Sachin": "India"}, {"Sachin": "Cricketer"}),
        ({"Ram": "Nepal"}, {"Ram": "Artist"}),
        ({"Raju": "Bangladesh"}, {"Raju": "Engineer"}),
        ({"David": "USA"}, {"David": "Musician"}),
        ({"Wilson": "Australia"}, {"Wilson": "Doctor"})]
# Creating DataFrame with Maps
df_maps = spark.createDataFrame(data, ["Name Map", "Profession Map"])
# Using the UDF to merge maps
combined_df = df_maps.withColumn("Combined Map", merge_maps_udf("Name Map", "Profession Map"))
combined_df.show()

Output

+--------------------+--------------------+--------------------+
|            Name Map|      Profession Map|        Combined Map|
+--------------------+--------------------+--------------------+
|   {Sachin -> India}|{Sachin -> Cricke...|{Sachin -> Cricke...|
|      {Ram -> Nepal}|     {Ram -> Artist}|     {Ram -> Artist}|
|{Raju -> Bangladesh}|  {Raju -> Engineer}|  {Raju -> Engineer}|
|      {David -> USA}| {David -> Musician}| {David -> Musician}|
|{Wilson -> Austra...|  {Wilson -> Doctor}|  {Wilson -> Doctor}|
+--------------------+--------------------+--------------------+

Spark important urls to refer

  1. Spark Examples
  2. PySpark Blogs
  3. Bigdata Blogs
  4. Spark Interview Questions
  5. Official Page
Author: user