In this article, we will be working with a dataset containing a column with names, ages, and timestamps. Our goal is to extract various time components from the timestamps, such as hours, minutes, seconds, milliseconds, and more. We will also demonstrate how to convert the timestamps to a specific timezone using PySpark. To achieve this, we will use the PySpark and PySpark SQL functions.
Prerequisites
- Python 3.7 or higher
- PySpark library
- Java 8 or higher
Input Data
First, let’s load the dataset into a PySpark DataFrame:
#Extracting Time Components and Converting Timezones with PySpark
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
spark = SparkSession.builder \
.appName("Time Components and Timezone Conversion @ Freshers.in") \
.getOrCreate()
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("timestamp", TimestampType(), True)
])
#data = spark.read.csv("data.csv", header=True, inferSchema=True)
data = spark.createDataFrame([
("Sachin", 30, datetime.strptime("2022-12-01 12:30:15.123", "%Y-%m-%d %H:%M:%S.%f")),
("Wilson", 25, datetime.strptime("2023-01-10 16:45:35.789", "%Y-%m-%d %H:%M:%S.%f")),
("Johnson", 35, datetime.strptime("2023-02-07 09:15:30.246", "%Y-%m-%d %H:%M:%S.%f"))
], schema)
data.printSchema()
data.show(20, False)
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
+-------+---+-----------------------+
|name |age|timestamp |
+-------+---+-----------------------+
|Sachin |30 |2022-12-01 12:30:15.123|
|Wilson |25 |2023-01-10 16:45:35.789|
|Johnson|35 |2023-02-07 09:15:30.246|
+-------+---+-----------------------+
from pyspark.sql.functions import (
hour, minute, second, year, month, dayofmonth, weekofyear, quarter, substring)
data.withColumn("hour", hour("timestamp")).show(20, False)
+-------+---+-----------------------+----+
|name |age|timestamp |hour|
+-------+---+-----------------------+----+
|Alice |30 |2022-12-01 12:30:15.123|12 |
|Bob |25 |2023-01-10 16:45:35.789|16 |
|Charlie|35 |2023-02-07 09:15:30.246|9 |
+-------+---+-----------------------+----+
data.withColumn("minute", minute("timestamp")).show(20, False)
+-------+---+-----------------------+------+
|name |age|timestamp |minute|
+-------+---+-----------------------+------+
|Alice |30 |2022-12-01 12:30:15.123|30 |
|Bob |25 |2023-01-10 16:45:35.789|45 |
|Charlie|35 |2023-02-07 09:15:30.246|15 |
+-------+---+-----------------------+------+
data.withColumn("second", second("timestamp")).show(20, False)
+-------+---+-----------------------+------+
|name |age|timestamp |second|
+-------+---+-----------------------+------+
|Alice |30 |2022-12-01 12:30:15.123|15 |
|Bob |25 |2023-01-10 16:45:35.789|35 |
|Charlie|35 |2023-02-07 09:15:30.246|30 |
+-------+---+-----------------------+------+
data.withColumn("millisecond", (substring("timestamp", 21, 3)).cast("int")).show(20, False)
+-------+---+-----------------------+-----------+
|name |age|timestamp |millisecond|
+-------+---+-----------------------+-----------+
|Alice |30 |2022-12-01 12:30:15.123|123 |
|Bob |25 |2023-01-10 16:45:35.789|789 |
|Charlie|35 |2023-02-07 09:15:30.246|246 |
+-------+---+-----------------------+-----------+
data.withColumn("year", year("timestamp")).show(20, False)
+-------+---+-----------------------+----+
|name |age|timestamp |year|
+-------+---+-----------------------+----+
|Alice |30 |2022-12-01 12:30:15.123|2022|
|Bob |25 |2023-01-10 16:45:35.789|2023|
|Charlie|35 |2023-02-07 09:15:30.246|2023|
+-------+---+-----------------------+----+
data.withColumn("month", month("timestamp")).show(20, False)
+-------+---+-----------------------+-----+
|name |age|timestamp |month|
+-------+---+-----------------------+-----+
|Alice |30 |2022-12-01 12:30:15.123|12 |
|Bob |25 |2023-01-10 16:45:35.789|1 |
|Charlie|35 |2023-02-07 09:15:30.246|2 |
+-------+---+-----------------------+-----+
data.withColumn("day", dayofmonth("timestamp")).show(20, False)
+-------+---+-----------------------+---+
|name |age|timestamp |day|
+-------+---+-----------------------+---+
|Alice |30 |2022-12-01 12:30:15.123|1 |
|Bob |25 |2023-01-10 16:45:35.789|10 |
|Charlie|35 |2023-02-07 09:15:30.246|7 |
+-------+---+-----------------------+---+
data.withColumn("week", weekofyear("timestamp")).show(20, False)
+-------+---+-----------------------+----+
|name |age|timestamp |week|
+-------+---+-----------------------+----+
|Alice |30 |2022-12-01 12:30:15.123|48 |
|Bob |25 |2023-01-10 16:45:35.789|2 |
|Charlie|35 |2023-02-07 09:15:30.246|6 |
+-------+---+-----------------------+----+
data.withColumn("quarter", quarter("timestamp")).show(20, False)
+-------+---+-----------------------+-------+
|name |age|timestamp |quarter|
+-------+---+-----------------------+-------+
|Alice |30 |2022-12-01 12:30:15.123|4 |
|Bob |25 |2023-01-10 16:45:35.789|1 |
|Charlie|35 |2023-02-07 09:15:30.246|1 |
+-------+---+-----------------------+-------+
To convert the timestamps to a specific timezone, we will use the PySpark SQL from_utc_timestamp function. In this example, we will convert the timestamps to the ‘America/New_York’ timezone:
from pyspark.sql.functions import from_utc_timestamp
data.withColumn("timestamp_local", from_utc_timestamp("timestamp", "America/New_York")).show(20, False)
+-------+---+-----------------------+-----------------------+
|name |age|timestamp |timestamp_local |
+-------+---+-----------------------+-----------------------+
|Alice |30 |2022-12-01 12:30:15.123|2022-12-01 07:30:15.123|
|Bob |25 |2023-01-10 16:45:35.789|2023-01-10 11:45:35.789|
|Charlie|35 |2023-02-07 09:15:30.246|2023-02-07 04:15:30.246|
+-------+---+-----------------------+-----------------------+
Spark important urls to refer