How to parses a column containing a JSON string using PySpark(from_json)

PySpark @ Freshers.in

from_json

If you have JSON object in a column, and need to do any transformation you can use from_json. from_json can parses a column containing a JSON string into a MapType with StringType as keys type, StructType or ArrayType with the specified schema. If it encounterd any unparsable value , it will return null.

Function : pyspark.sql.functions.from_json(col, schema, options=None)
Syntax : from_json(col, schema, options=None)

Note : 

  1. “col” should be well formed with respect to schema and options
  2. It is not possible for map values to have different types. Use a struct for this situation.

Example code

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
from pyspark.sql.types import MapType,StringType,IntegerType
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType,StructField
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('www.freshers.in training').getOrCreate()
data = [
(1,"""{"country":"USA","country_id":100}""","USD"),
(2,"""{"country":"UK","country_id":200}""","EURO"),
(3,"""{"country":"INDIA","country_id":300}""","INR")]
schema= ["sino","cntry_json","currency"]
df = spark.createDataFrame(data,schema)
df.show(20,False)
+----+------------------------------------+--------+
|sino|cntry_json |currency|
+----+------------------------------------+--------+
|1 |{"country":"USA","country_id":100} |USD |
|2 |{"country":"UK","country_id":200} |EURO |
|3 |{"country":"INDIA","country_id":300}|INR |
+----+------------------------------------+--------+
df.printSchema()
root
|-- sino: long (nullable = true)
|-- cntry_json: string (nullable = true)
|-- currency: string (nullable = true)
#Converting using from_json
>>> df2 = df.select(df.sino,from_json(df.cntry_json,MapType(StringType(),StringType())),df.currency,)
>>> df2.show(20,False)
+----+-------------------------------------+--------+
|sino|entries |currency|
+----+-------------------------------------+--------+
|1 |[country -> USA, country_id -> 100] |USD |
|2 |[country -> UK, country_id -> 200] |EURO |
|3 |[country -> INDIA, country_id -> 300]|INR |
+----+-------------------------------------+--------+
df2.printSchema()
root
|-- sino: long (nullable = true)
|-- entries: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- currency: string (nullable = true)
#Here we are defining the schema for the JSON column
schema2 = StructType([ 
StructField("country",StringType(),True), 
StructField("country_id",IntegerType(),True), 
])
df3 = df.withColumn("cntry_json",from_json(df.cntry_json,schema2))
df3.show()
+----+------------+--------+
|sino| cntry_json|currency|
+----+------------+--------+
| 1| [USA, 100]| USD|
| 2| [UK, 200]| EURO|
| 3|[INDIA, 300]| INR|
+----+------------+--------+
df3.printSchema()
root
|-- sino: long (nullable = true)
|-- cntry_json: struct (nullable = true)
| |-- country: string (nullable = true)
| |-- country_id: integer (nullable = true)
|-- currency: string (nullable = true)

Reference

  1. Spark Examples
  2. PySpark Blogs
  3. Bigdata Blogs
  4. Spark Interview Questions
  5. Official Page
Author: user

Leave a Reply