In this article we will see how to returns a set of objects in an array with or without duplicate elements. We can also rewrite as converting a column element in to array of objects. To implement this we need two spark sql functions
- pyspark.sql.functions.collect_list
- pyspark.sql.functions.collect_set
pyspark.sql.functions.collect_list
This is an aggregate function which returns a list of objects with duplicates.
pyspark.sql.functions.collect_set
This is also an aggregate function which returns a set of objects with duplicate elements eliminated.
Because the order of collected results depends on the order of the rows, which may change after a shuffle, both of the aforementioned functions are non-deterministic.
Sample data (freshers_emp_tech.csv)
emp_id,name,cloud,tech_stack,hrly_min_pay,hrly_max_pay,country,status
1001,Sam Peter,AWS,EMR,150,168,USA,1
1004,Tim Moris,AWS,ECR,110,123.2,CHINA,1
1002,John Manual,AWS,Athena,150,168,INDIA,0
1003,Eric Burst,GCP,Cloud Run,120,134.4,USA,1
1004,Tim Moris,AWS,AWS Lambda,100,112,CHINA,1
1005,Jack Berry,AWS,EMR,110,123.2,INDIA,1
1001,Sam Peter,GCP,App Engine,120,134.4,USA,1
1002,John Manual,AWS,Redshift,150,168,INDIA,0
1001,Sam Peter,AWS,Redshift,150,168,USA,1
1001,Sam Peter,GCP,Big Query,100,112,USA,1
1003,Eric Burst,GCP,Colud SQL,120,134.4,USA,1
1005,Jack Berry,AWS,Spark,110,123.2,INDIA,1
1003,Eric Burst,GCP,GCS,120,134.4,USA,1
1001,Sam Peter,GCP,DataProc,150,168,USA,1
1001,Sam Peter,AWS,Spark,100,112,USA,1
1004,Tim Moris,AWS,Docker,120,134.4,CHINA,1
1002,John Manual,AWS,Lambda,120,134.4,INDIA,0
1005,Jack Berry,AWS,Redshift,100,112,INDIA,1
1003,Eric Burst,GCP,Anthos,150,168,USA,1
Sample code
#We import the required libraries are read data from locatoin
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set
from pyspark.sql.functions import collect_list
spark = SparkSession.builder.appName("Sample code to demonstrate collect_set and collect_list @ freshers.in").getOrCreate()
df = spark.read.csv('/mnt/learning/PySpark/freshers_emp_tech.csv',header=True,inferSchema=True)
df.show()
+------+-----------+-----+----------+------------+------------+-------+------+
|emp_id| name|cloud|tech_stack|hrly_min_pay|hrly_max_pay|country|status|
+------+-----------+-----+----------+------------+------------+-------+------+
| 1001| Sam Peter| AWS| EMR| 150| 168.0| USA| 1|
| 1004| Tim Moris| AWS| ECR| 110| 123.2| CHINA| 1|
| 1002|John Manual| AWS| Athena| 150| 168.0| INDIA| 0|
| 1003| Eric Burst| GCP| Cloud Run| 120| 134.4| USA| 1|
| 1004| Tim Moris| AWS|AWS Lambda| 100| 112.0| CHINA| 1|
| 1005| Jack Berry| AWS| EMR| 110| 123.2| INDIA| 1|
| 1001| Sam Peter| GCP|App Engine| 120| 134.4| USA| 1|
| 1002|John Manual| AWS| Redshift| 150| 168.0| INDIA| 0|
| 1001| Sam Peter| AWS| Redshift| 150| 168.0| USA| 1|
| 1001| Sam Peter| GCP| Big Query| 100| 112.0| USA| 1|
| 1003| Eric Burst| GCP| Colud SQL| 120| 134.4| USA| 1|
| 1005| Jack Berry| AWS| Spark| 110| 123.2| INDIA| 1|
| 1003| Eric Burst| GCP| GCS| 120| 134.4| USA| 1|
| 1001| Sam Peter| GCP| DataProc| 150| 168.0| USA| 1|
| 1001| Sam Peter| AWS| Spark| 100| 112.0| USA| 1|
| 1004| Tim Moris| AWS| Docker| 120| 134.4| CHINA| 1|
| 1002|John Manual| AWS| Lambda| 120| 134.4| INDIA| 0|
| 1005| Jack Berry| AWS| Redshift| 100| 112.0| INDIA| 1|
| 1003| Eric Burst| GCP| Anthos| 150| 168.0| USA| 1|
+------+-----------+-----+----------+------------+------------+-------+------+
df.printSchema()
root
|-- emp_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- cloud: string (nullable = true)
|-- tech_stack: string (nullable = true)
|-- hrly_min_pay: integer (nullable = true)
|-- hrly_max_pay: double (nullable = true)
|-- country: string (nullable = true)
|-- status: integer (nullable = true)
#Basic on how to use collect_list
df.agg(collect_list('tech_stack')).show(20,False)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_list(tech_stack) |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[EMR, ECR, Athena, Cloud Run, AWS Lambda, EMR, App Engine, Redshift, Redshift, Big Query, Colud SQL, Spark, GCS, DataProc, Spark, Docker, Lambda, Redshift, Anthos]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#Basic on how to use collect_set
df.agg(collect_set('tech_stack')).show(20,False)
+-----------------------------------------------------------------------------------------------------------------------------------+
|collect_set(tech_stack) |
+-----------------------------------------------------------------------------------------------------------------------------------+
|[Athena, Colud SQL, Lambda, Docker, App Engine, Spark, Anthos, AWS Lambda, Big Query, Cloud Run, Redshift, DataProc, ECR, EMR, GCS]|
+-----------------------------------------------------------------------------------------------------------------------------------+
#Sample on how to do a sum on the dataframe
df.groupBy(df.name).agg({"hrly_min_pay":"sum"}).show()
+-----------+-----------------+
| name|sum(hrly_min_pay)|
+-----------+-----------------+
| Sam Peter| 770|
|John Manual| 420|
| Jack Berry| 320|
| Tim Moris| 330|
| Eric Burst| 510|
+-----------+-----------------+
#Sample code to use collect_list, collect_set and sum on a single step
df.groupBy(df.name).agg({"tech_stack":"collect_list","cloud":"collect_set","hrly_min_pay":"sum"}).show(20,False)
+-----------+------------------+-------------------------------------------------------+-----------------+
|name |collect_set(cloud)|collect_list(tech_stack) |sum(hrly_min_pay)|
+-----------+------------------+-------------------------------------------------------+-----------------+
|Sam Peter |[AWS, GCP] |[EMR, App Engine, Redshift, Big Query, DataProc, Spark]|770 |
|John Manual|[AWS] |[Athena, Redshift, Lambda] |420 |
|Jack Berry |[AWS] |[EMR, Spark, Redshift] |320 |
|Tim Moris |[AWS] |[ECR, AWS Lambda, Docker] |330 |
|Eric Burst |[GCP] |[Cloud Run, Colud SQL, GCS, Anthos] |510 |
+-----------+------------------+-------------------------------------------------------+-----------------+
Reference