How to create UDF in PySpark ? What are the different ways you can call PySpark UDF ( With example)

PySpark @ Freshers.in

PySpark UDF

In order to develop a reusable function in Spark, one can use the PySpark UDF. PySpark UDF is used to extend the PySpark build in capabilities. UDF (User Defined Functions) are used to extend the functionality of functions or to create a new function which can be applied on DataFrame. UDF can be re-use on multiple DataFrames. If there is no built in features then you can create a UDF and use accordingly. There are multiple ways you can define and call UDF. In short PySpark UDF is used to define a new Column-based function. Three methods are shown below. In the below example we are have a simple employee table and we want to calculate the Total salary by doing operations on Salary and Commission.

Important Note

  1. UDF’s are the most expensive operations in Spark.
  2. Create when you have no other options to implement your logic.
  3. Before you create UDF, make sure that similar functions are not available in the Spark SQL Functions.
  4. The default datatype of the udf() is StringType.

Sample Code

#Example
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import IntegerType,StringType,FloatType
from pyspark.sql.functions import udf,col
emp_data = ([
(1,"Sam",10000,12.0,"Los Angeles County"),\
(2,"Peter",24000,6.0,"Los Angeles County"),\
(3,"John",9000,8.5,"Los Angeles County"),\
(4,"Jaison",12000,11.0,"Los Angeles County"),\
(5,"Mike",15000,22.0,"Los Angeles County")])
emp_schema=StructType([
StructField("si_no",IntegerType(),True),
StructField("name",StringType(),True),
StructField("salary",IntegerType(),True),
StructField("commission",FloatType(),True),
StructField("county",StringType(),True)])
state_data_df = spark.createDataFrame(data=emp_data,schema=emp_schema)
state_data_df.show(20,False)
+-----+------+------+----------+------------------+
|si_no|name  |salary|commission|county            |
+-----+------+------+----------+------------------+
|1    |Sam   |10000 |12.0      |Los Angeles County|
|2    |Peter |24000 |6.0       |Los Angeles County|
|3    |John  |9000  |8.5       |Los Angeles County|
|4    |Jaison|12000 |11.0      |Los Angeles County|
|5    |Mike  |15000 |22.0      |Los Angeles County|
+-----+------+------+----------+------------------+
# Function to calculate total salary using two columns salary and commission
# This is the function we are using as UDF for Method I and Method II
def total_salary(sal,comm):
    tSal = sal + ((sal * comm)/100)
    return tSal
#Method I - Creating sal_udf UDF 
sal_udf = udf(lambda x,y: total_salary(x,y),FloatType())
state_data_df.select(col("si_no"),col("name"),col("salary"),col("commission"),sal_udf(col("salary"),col("commission")).alias("Total Salary")).show(20,False)
+-----+------+------+----------+------------+
|si_no|name  |salary|commission|Total Salary|
+-----+------+------+----------+------------+
|1    |Sam   |10000 |12.0      |11200.0     |
|2    |Peter |24000 |6.0       |25440.0     |
|3    |John  |9000  |8.5       |9765.0      |
|4    |Jaison|12000 |11.0      |13320.0     |
|5    |Mike  |15000 |22.0      |18300.0     |
+-----+------+------+----------+------------+
##Method II Using UDF on SQL, we are using the same function about for this 
spark.udf.register("sal_udf_method_2", total_salary,FloatType())
state_data_df.createOrReplaceTempView("emp_table")
spark.sql("select si_no,name,salary,commission,sal_udf_method_2(salary,commission) as total_salary from emp_table").show(20,False)
+-----+------+------+----------+------------+
|si_no|name  |salary|commission|total_salary|
+-----+------+------+----------+------------+
|1    |Sam   |10000 |12.0      |11200.0     |
|2    |Peter |24000 |6.0       |25440.0     |
|3    |John  |9000  |8.5       |9765.0      |
|4    |Jaison|12000 |11.0      |13320.0     |
|5    |Mike  |15000 |22.0      |18300.0     |
+-----+------+------+----------+------------+
#Method III : udf function is also a built in decorator
@udf(returnType=FloatType())
def total_salary_calc(sal,comm):
    tSal = sal + ((sal * comm)/100)
    return tSal
	
state_data_df.select(col("si_no"),col("name"),col("salary"),col("commission"),total_salary_calc(col("salary"),col("commission")).alias("Total Salary")).show(20,False)
+-----+------+------+----------+------------+
|si_no|name  |salary|commission|Total Salary|
+-----+------+------+----------+------------+
|1    |Sam   |10000 |12.0      |11200.0     |
|2    |Peter |24000 |6.0       |25440.0     |
|3    |John  |9000  |8.5       |9765.0      |
|4    |Jaison|12000 |11.0      |13320.0     |
|5    |Mike  |15000 |22.0      |18300.0     |
+-----+------+------+----------+------------+

Reference

  1. Spark Interview Questions
  2. Spark Examples
  3. PySpark Blogs
  4. Bigdata Blogs
  5. Official Page
Author: user

Leave a Reply