How to create a table from CSV file and write SQL on top of it in Spark (Sample code)

PySpark @ Freshers.in

In this article you will see how you can read a CSV file using pySpark , how to control header on read and how to infer schema while reading. Spark have a excellent feature to infer schema , so that we no need to worry about the data type of the source feed. Next you will learn on how to create a table on top of the read data set and how to query on that created table. 

Sample Data

sino,name,date,age,amount,status
1,Sam Peter,08-07-2022,7,1000,1
2,John Manual,15-06-2022,12,3500,2
3,Eric Burst,08-07-2022,6,2250,2
4,Tim Moris,08-09-2022,8,3100,2
5,Jack Berry,08-11-2022,10,1050,3

Sample PySpark Code 

#Importing SparkSession for constructing a SparkSession.

from pyspark.sql import SparkSession

#Gets an existing SparkSession or, if there is no existing one, creates a new.

spark = SparkSession.builder.appName("read from csv and convert to table @ freshers.in").getOrCreate()

#Reading CSV from local, with header , true option and by inferring the schema  , if you having it in S3 or hdfs , you need to give that path.

df = spark.read.csv('D:\\Learning\\PySpark\\freshers_sample.csv',header=True,inferSchema=True)

#Display the schema of the CSV file .

df.printSchema()
root
 |-- sino: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- status: integer (nullable = true)

#Display the content of the read CSV.

df.show()
+----+-----------+----------+---+------+------+
|sino|       name|      date|age|amount|status|
+----+-----------+----------+---+------+------+
|   1|  Sam Peter|08-07-2022|  7|  1000|     1|
|   2|John Manual|15-06-2022| 12|  3500|     2|
|   3| Eric Burst|08-07-2022|  6|  2250|     2|
|   4|  Tim Moris|08-09-2022|  8|  3100|     2|
|   5| Jack Berry|08-11-2022| 10|  1050|     3|
+----+-----------+----------+---+------+------+

#createOrReplaceTempView will create or replaces a local temporary view with this DataFrame.

df.createOrReplaceTempView('freshers_table')

#Querying the created temporary view 1[created on top of csv file].

spark.sql("select * from freshers_table").show()
+----+-----------+----------+---+------+------+
|sino|       name|      date|age|amount|status|
+----+-----------+----------+---+------+------+
|   1|  Sam Peter|08-07-2022|  7|  1000|     1|
|   2|John Manual|15-06-2022| 12|  3500|     2|
|   3| Eric Burst|08-07-2022|  6|  2250|     2|
|   4|  Tim Moris|08-09-2022|  8|  3100|     2|
|   5| Jack Berry|08-11-2022| 10|  1050|     3|
+----+-----------+----------+---+------+------+

#Querying the created temporary view 2

spark.sql("select sum(amount),status from freshers_table group by status").show()
+-----------+------+
|sum(amount)|status|
+-----------+------+
|       1000|     1|
|       1050|     3|
|       8850|     2|
+-----------+------+

Reference

  1. Spark Interview Questions
  2. Spark Examples
  3. PySpark Blogs
  4. Bigdata Blogs
  5. Official Page
Author: user

Leave a Reply