PySpark : When are new Stages created in the Spark DAG?

PySpark @ Freshers.in

Apache Spark’s computational model is based on a Directed Acyclic Graph (DAG). When you perform operations on a DataFrame or RDD (Resilient Distributed Dataset) in Spark, these operations are not executed immediately. Instead, they’re converted into a series of stages in the DAG, which will later be executed in a certain order. Understanding when and why new stages are created is vital for tuning and optimizing Spark applications.

Spark Execution 

Before we delve into stages, it’s essential to understand the difference between transformations and actions in Spark.

Transformations: These operations create a new DataFrame or RDD from an existing one, like map, filter, or groupBy. Transformations are lazy, meaning they’re not computed until an action is called.

Actions: These operations return a value to the driver program or write data to an external storage system. Examples include count, first, and saveAsTextFile.

Creation of New Stages

The primary reason for the creation of a new stage in the DAG is the presence of a wide transformation. In PySpark terms:

Narrow Transformations: These transformations compute the partitions of the child RDD based on the partitions of the parent RDD without reshuffling. Examples include map and filter.

Wide Transformations: These are the transformations that produce an RDD by reshuffling the partitions of the parent RDD. Examples include groupBy and reduceByKey.

Whenever a wide transformation is encountered, a new stage is created in the DAG.

Example using PySpark

Let’s walk through an example using PySpark DataFrames:

from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("freshers.in Learning @ DAG_Example").getOrCreate()

# Sample DataFrame for learning @ Freshers.in 
freshers_in_data = [("Sachin", "Computer Science", 85),
                   ("Vivek", "Mechanical Engineering", 90),
                   ("Ram", "Electrical Engineering", 88),
                   ("Iqbal", "Computer Science", 80)]
columns = ["Name", "Branch", "Score"]

freshers_in_df = spark.createDataFrame(freshers_in_data, columns)

# Transformation: Filtering rows where Score is greater than 85
freshers_in_filtered = freshers_in_df.filter(freshers_in_df.Score > 85)

# Wide Transformation: Grouping by Branch and calculating average score
freshers_in_avg_scores = freshers_in_filtered.groupBy("Branch").avg("Score")

# Action: Displaying the result
freshers_in_avg_scores.show()

In the above PySpark code:

The filter transformation is a narrow transformation, so no new stage is created.
The groupBy is a wide transformation. Therefore, a new stage is created at this step.

Spark important urls to refer

  1. Spark Examples
  2. PySpark Blogs
  3. Bigdata Blogs
  4. Spark Interview Questions
  5. Official Page
Author: user

Leave a Reply