PySpark : Creating Ranges in PySpark DataFrame with Custom Start, End, and Increment Values

PySpark @ Freshers.in

In PySpark, there isn’t a built-in function to create an array sequence given a start, end, and increment value. In PySpark, you can use the range function, but it’s only available for integer values. For float values, PySpark doesn’t provide such an option. But, we can use a workaround and apply an UDF (User-Defined Function) to create a list between the start_val and end_val with increments of increment_val.

Here’s how to do it:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
df = spark.createDataFrame([(1, 10, 2), (3, 6, 1), (10, 20, 5)], ['start_val', 'end_val', 'increment_val'])
# Define UDF to create the range
def create_range(start, end, increment):
    return list(range(start, end + 1, increment))
create_range_udf = udf(create_range, ArrayType(IntegerType()))
# Apply the UDF
df = df.withColumn('range', create_range_udf(df['start_val'], df['end_val'], df['increment_val']))
# Show the DataFrame
df.show(truncate=False)

This will create a new column called range in the DataFrame that contains a list from start_val to end_val with increments of increment_val.

Result

+---------+-------+-------------+------------------+
|start_val|end_val|increment_val|range             |
+---------+-------+-------------+------------------+
|1        |10     |2            |[1, 3, 5, 7, 9]  |
|3        |6      |1            |[3, 4, 5, 6]     |
|10       |20     |5            |[10, 15, 20]     |
+---------+-------+-------------+------------------+

Remember that using Python UDFs might have a performance impact when dealing with large volumes of data, as data needs to be moved from the JVM to Python, which is an expensive operation. It is usually a good idea to profile your Spark application and ensure the performance is acceptable.

Second Option [This below method is not suggested] Just for your information

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
import numpy as np
# Start SparkSession
spark = SparkSession.builder \
    .appName('Array Sequence Generator') \
    .getOrCreate()
# Sample DataFrame
df = spark.createDataFrame([
    (1, 10, 2),
    (5, 20, 3),
    (0, 15, 5)
], ["start_val", "end_val", "increment_val"])
# Define UDF
def sequence_array(start, end, step):
    return list(np.arange(start, end, step))
sequence_array_udf = udf(sequence_array, ArrayType(IntegerType()))
# Use the UDF
df = df.withColumn("sequence", sequence_array_udf(df.start_val, df.end_val, df.increment_val))
# Show the DataFrame
df.show(truncate=False)

In this example, the sequence_array function uses numpy’s arange function to generate a sequence of numbers given a start, end, and step value. The udf function is used to convert this function into a UDF that can be used with PySpark DataFrames.

The DataFrame df is created with three columns: start_val, end_val, and increment_val. The UDF sequence_array_udf is then used to generate a new column “sequence” in the DataFrame, which contains arrays of numbers starting at start_val, ending at end_val (exclusive), and incrementing by increment_val.

Spark important urls to refer

  1. Spark Examples
  2. PySpark Blogs
  3. Bigdata Blogs
  4. Spark Interview Questions
  5. Official Page
Author: user

Leave a Reply