How to transform columns into list of objects [arrays] on top of group by in PySpark – collect_list and collect_set

PySpark @ Freshers.in

In this article we will see how to returns a set of objects in an array with or without duplicate elements. We can also rewrite as converting a column element in to array of objects. To implement this we need two spark sql functions

  1. pyspark.sql.functions.collect_list
  2. pyspark.sql.functions.collect_set

pyspark.sql.functions.collect_list

This is an aggregate function which returns a list of objects with duplicates.

pyspark.sql.functions.collect_set

This is also an aggregate function which returns a set of objects with duplicate elements eliminated.

Because the order of collected results depends on the order of the rows, which may change after a shuffle, both of the aforementioned functions are non-deterministic. 

Sample data (freshers_emp_tech.csv)

emp_id,name,cloud,tech_stack,hrly_min_pay,hrly_max_pay,country,status
1001,Sam Peter,AWS,EMR,150,168,USA,1
1004,Tim Moris,AWS,ECR,110,123.2,CHINA,1
1002,John Manual,AWS,Athena,150,168,INDIA,0
1003,Eric Burst,GCP,Cloud Run,120,134.4,USA,1
1004,Tim Moris,AWS,AWS Lambda,100,112,CHINA,1
1005,Jack Berry,AWS,EMR,110,123.2,INDIA,1
1001,Sam Peter,GCP,App Engine,120,134.4,USA,1
1002,John Manual,AWS,Redshift,150,168,INDIA,0
1001,Sam Peter,AWS,Redshift,150,168,USA,1
1001,Sam Peter,GCP,Big Query,100,112,USA,1
1003,Eric Burst,GCP,Colud SQL,120,134.4,USA,1
1005,Jack Berry,AWS,Spark,110,123.2,INDIA,1
1003,Eric Burst,GCP,GCS,120,134.4,USA,1
1001,Sam Peter,GCP,DataProc,150,168,USA,1
1001,Sam Peter,AWS,Spark,100,112,USA,1
1004,Tim Moris,AWS,Docker,120,134.4,CHINA,1
1002,John Manual,AWS,Lambda,120,134.4,INDIA,0
1005,Jack Berry,AWS,Redshift,100,112,INDIA,1
1003,Eric Burst,GCP,Anthos,150,168,USA,1

Sample code

#We import the required libraries are read data from locatoin

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set
from pyspark.sql.functions import collect_list
spark = SparkSession.builder.appName("Sample code to demonstrate collect_set and collect_list @ freshers.in").getOrCreate()
df = spark.read.csv('/mnt/learning/PySpark/freshers_emp_tech.csv',header=True,inferSchema=True)
df.show()
+------+-----------+-----+----------+------------+------------+-------+------+
|emp_id|       name|cloud|tech_stack|hrly_min_pay|hrly_max_pay|country|status|
+------+-----------+-----+----------+------------+------------+-------+------+
|  1001|  Sam Peter|  AWS|       EMR|         150|       168.0|    USA|     1|
|  1004|  Tim Moris|  AWS|       ECR|         110|       123.2|  CHINA|     1|
|  1002|John Manual|  AWS|    Athena|         150|       168.0|  INDIA|     0|
|  1003| Eric Burst|  GCP| Cloud Run|         120|       134.4|    USA|     1|
|  1004|  Tim Moris|  AWS|AWS Lambda|         100|       112.0|  CHINA|     1|
|  1005| Jack Berry|  AWS|       EMR|         110|       123.2|  INDIA|     1|
|  1001|  Sam Peter|  GCP|App Engine|         120|       134.4|    USA|     1|
|  1002|John Manual|  AWS|  Redshift|         150|       168.0|  INDIA|     0|
|  1001|  Sam Peter|  AWS|  Redshift|         150|       168.0|    USA|     1|
|  1001|  Sam Peter|  GCP| Big Query|         100|       112.0|    USA|     1|
|  1003| Eric Burst|  GCP| Colud SQL|         120|       134.4|    USA|     1|
|  1005| Jack Berry|  AWS|     Spark|         110|       123.2|  INDIA|     1|
|  1003| Eric Burst|  GCP|       GCS|         120|       134.4|    USA|     1|
|  1001|  Sam Peter|  GCP|  DataProc|         150|       168.0|    USA|     1|
|  1001|  Sam Peter|  AWS|     Spark|         100|       112.0|    USA|     1|
|  1004|  Tim Moris|  AWS|    Docker|         120|       134.4|  CHINA|     1|
|  1002|John Manual|  AWS|    Lambda|         120|       134.4|  INDIA|     0|
|  1005| Jack Berry|  AWS|  Redshift|         100|       112.0|  INDIA|     1|
|  1003| Eric Burst|  GCP|    Anthos|         150|       168.0|    USA|     1|
+------+-----------+-----+----------+------------+------------+-------+------+
df.printSchema()
root
 |-- emp_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- cloud: string (nullable = true)
 |-- tech_stack: string (nullable = true)
 |-- hrly_min_pay: integer (nullable = true)
 |-- hrly_max_pay: double (nullable = true)
 |-- country: string (nullable = true)
 |-- status: integer (nullable = true)

#Basic on how to use collect_list

df.agg(collect_list('tech_stack')).show(20,False)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_list(tech_stack)                                                                                                                                           |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[EMR, ECR, Athena, Cloud Run, AWS Lambda, EMR, App Engine, Redshift, Redshift, Big Query, Colud SQL, Spark, GCS, DataProc, Spark, Docker, Lambda, Redshift, Anthos]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

#Basic on how to use collect_set

df.agg(collect_set('tech_stack')).show(20,False)
+-----------------------------------------------------------------------------------------------------------------------------------+
|collect_set(tech_stack)                                                                                                            |
+-----------------------------------------------------------------------------------------------------------------------------------+
|[Athena, Colud SQL, Lambda, Docker, App Engine, Spark, Anthos, AWS Lambda, Big Query, Cloud Run, Redshift, DataProc, ECR, EMR, GCS]|
+-----------------------------------------------------------------------------------------------------------------------------------+

#Sample on how to do a sum on the dataframe

df.groupBy(df.name).agg({"hrly_min_pay":"sum"}).show()
+-----------+-----------------+
|       name|sum(hrly_min_pay)|
+-----------+-----------------+
|  Sam Peter|              770|
|John Manual|              420|
| Jack Berry|              320|
|  Tim Moris|              330|
| Eric Burst|              510|
+-----------+-----------------+

#Sample code to use collect_list, collect_set and sum on a single step

df.groupBy(df.name).agg({"tech_stack":"collect_list","cloud":"collect_set","hrly_min_pay":"sum"}).show(20,False)
+-----------+------------------+-------------------------------------------------------+-----------------+
|name       |collect_set(cloud)|collect_list(tech_stack)                               |sum(hrly_min_pay)|
+-----------+------------------+-------------------------------------------------------+-----------------+
|Sam Peter  |[AWS, GCP]        |[EMR, App Engine, Redshift, Big Query, DataProc, Spark]|770              |
|John Manual|[AWS]             |[Athena, Redshift, Lambda]                             |420              |
|Jack Berry |[AWS]             |[EMR, Spark, Redshift]                                 |320              |
|Tim Moris  |[AWS]             |[ECR, AWS Lambda, Docker]                              |330              |
|Eric Burst |[GCP]             |[Cloud Run, Colud SQL, GCS, Anthos]                    |510              |
+-----------+------------------+-------------------------------------------------------+-----------------+

Reference

  1. Spark Interview Questions
  2. Spark Examples
  3. PySpark Blogs
  4. Bigdata Blogs
  5. Official Page
Author: user

Leave a Reply